impl.go 133.3 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24 25
	"strconv"

26
	"github.com/milvus-io/milvus/internal/util/timerecord"
27

28
	"go.uber.org/zap"
S
sunby 已提交
29

30
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/log"
32
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
33
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
34 35 36 37 38 39
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
40
	"github.com/milvus-io/milvus/internal/proto/schemapb"
41
	"github.com/milvus-io/milvus/internal/util/distance"
42 43 44 45
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
46
	"github.com/milvus-io/milvus/internal/util/typeutil"
47 48
)

49 50
const moduleName = "Proxy"

51
// UpdateStateCode updates the state code of Proxy.
C
Cai Yudong 已提交
52
func (node *Proxy) UpdateStateCode(code internalpb.StateCode) {
53
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
54 55
}

56
// GetComponentStates get state of Proxy.
C
Cai Yudong 已提交
57
func (node *Proxy) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
58 59 60 61 62 63 64 65 66 67 68 69
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
70
		return stats, nil
G
godchen 已提交
71
	}
72 73 74 75
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
G
godchen 已提交
76
	info := &internalpb.ComponentInfo{
77 78
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
79
		Role:      typeutil.ProxyRole,
G
godchen 已提交
80 81 82 83 84 85
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
86
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
87
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
88 89 90 91 92 93 94 95 96
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

97
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
98
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
99 100
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate collection meta cache",
101
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
102 103 104
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

105
	collectionName := request.CollectionName
N
neza2017 已提交
106 107 108
	if globalMetaCache != nil {
		globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
	}
109
	logutil.Logger(ctx).Debug("complete to invalidate collection meta cache",
110
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
111 112 113
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

114
	return &commonpb.Status{
115
		ErrorCode: commonpb.ErrorCode_Success,
116 117
		Reason:    "",
	}, nil
118 119
}

120
// ReleaseDQLMessageStream release the query message stream of specific collection.
C
Cai Yudong 已提交
121
func (node *Proxy) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
122 123
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to release DQL message strem",
124
		zap.Any("role", typeutil.ProxyRole),
125 126 127
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

128 129 130 131
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

132 133
	_ = node.chMgr.removeDQLStream(request.CollectionID)

134
	logutil.Logger(ctx).Debug("complete to release DQL message stream",
135
		zap.Any("role", typeutil.ProxyRole),
136 137 138 139 140 141 142 143 144
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

145
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
146
// CreateCollection create a collection by the schema.
C
Cai Yudong 已提交
147
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
148 149 150
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
151 152 153 154

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
155 156 157 158
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
159

160
	cct := &createCollectionTask{
S
sunby 已提交
161
		ctx:                     ctx,
162 163
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
164
		rootCoord:               node.rootCoord,
165 166
	}

167 168 169
	// avoid data race
	lenOfSchema := len(request.Schema)

170 171
	log.Debug(
		rpcReceived(method),
172
		zap.String("traceID", traceID),
173
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
174 175
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
176
		zap.Int("len(schema)", lenOfSchema),
177 178
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
179

180 181 182
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
183 184
			zap.Error(err),
			zap.String("traceID", traceID),
185
			zap.String("role", typeutil.ProxyRole),
186 187 188
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Int("len(schema)", lenOfSchema),
189 190
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
191

192
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
193
		return &commonpb.Status{
194
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
195 196 197 198
			Reason:    err.Error(),
		}, nil
	}

199 200
	log.Debug(
		rpcEnqueued(method),
201
		zap.String("traceID", traceID),
202
		zap.String("role", typeutil.ProxyRole),
203 204 205
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
206 207
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
208 209
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
210 211
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
212

213 214 215
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
216
			zap.Error(err),
217
			zap.String("traceID", traceID),
218
			zap.String("role", typeutil.ProxyRole),
219 220 221
			zap.Int64("MsgID", cct.ID()),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
222 223
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
224
			zap.Int("len(schema)", lenOfSchema),
225 226
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
D
dragondriver 已提交
227

228
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
229
		return &commonpb.Status{
230
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
231 232 233 234
			Reason:    err.Error(),
		}, nil
	}

235 236
	log.Debug(
		rpcDone(method),
237
		zap.String("traceID", traceID),
238
		zap.String("role", typeutil.ProxyRole),
239 240 241 242 243 244
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
245 246
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
247

248 249
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
250 251 252
	return cct.result, nil
}

253
// DropCollection drop a collection.
C
Cai Yudong 已提交
254
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
255 256 257
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
258 259 260 261

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
262 263 264
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
265

266
	dct := &dropCollectionTask{
S
sunby 已提交
267
		ctx:                   ctx,
268 269
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
270
		rootCoord:             node.rootCoord,
271
		chMgr:                 node.chMgr,
S
sunby 已提交
272
		chTicker:              node.chTicker,
273 274
	}

275 276
	log.Debug("DropCollection received",
		zap.String("traceID", traceID),
277
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
278 279
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
280 281 282 283 284

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
285
			zap.String("role", typeutil.ProxyRole),
286 287 288
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

289
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
290
		return &commonpb.Status{
291
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
292 293 294 295
			Reason:    err.Error(),
		}, nil
	}

296 297
	log.Debug("DropCollection enqueued",
		zap.String("traceID", traceID),
298
		zap.String("role", typeutil.ProxyRole),
299 300 301
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
302 303
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
304 305 306

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
307
			zap.Error(err),
308
			zap.String("traceID", traceID),
309
			zap.String("role", typeutil.ProxyRole),
310 311 312
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTs", dct.BeginTs()),
			zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
313 314 315
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

316
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
317
		return &commonpb.Status{
318
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
319 320 321 322
			Reason:    err.Error(),
		}, nil
	}

323 324
	log.Debug("DropCollection done",
		zap.String("traceID", traceID),
325
		zap.String("role", typeutil.ProxyRole),
326 327 328 329 330 331
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

332 333
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
334 335 336
	return dct.result, nil
}

337
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
338
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
339 340 341 342 343
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
344 345 346 347

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
348 349 350
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
351
		metrics.TotalLabel).Inc()
352 353 354

	log.Debug("HasCollection received",
		zap.String("traceID", traceID),
355
		zap.String("role", typeutil.ProxyRole),
356 357 358
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

359
	hct := &hasCollectionTask{
S
sunby 已提交
360
		ctx:                  ctx,
361 362
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
363
		rootCoord:            node.rootCoord,
364 365
	}

366 367 368 369
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
370
			zap.String("role", typeutil.ProxyRole),
371 372 373
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

374
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
375
			metrics.AbandonLabel).Inc()
376 377
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
378
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
379 380 381 382 383
				Reason:    err.Error(),
			},
		}, nil
	}

384 385
	log.Debug("HasCollection enqueued",
		zap.String("traceID", traceID),
386
		zap.String("role", typeutil.ProxyRole),
387 388 389
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
390 391
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
392 393 394

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
395
			zap.Error(err),
396
			zap.String("traceID", traceID),
397
			zap.String("role", typeutil.ProxyRole),
398 399 400
			zap.Int64("MsgID", hct.ID()),
			zap.Uint64("BeginTS", hct.BeginTs()),
			zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
401 402 403
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

404
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
405
			metrics.FailLabel).Inc()
406 407
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
408
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
409 410 411 412 413
				Reason:    err.Error(),
			},
		}, nil
	}

414 415
	log.Debug("HasCollection done",
		zap.String("traceID", traceID),
416
		zap.String("role", typeutil.ProxyRole),
417 418 419 420 421 422
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

423
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
424 425
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
426 427 428
	return hct.result, nil
}

429
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
430
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
431 432 433
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
434 435 436 437

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
438 439
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
440

441
	lct := &loadCollectionTask{
S
sunby 已提交
442
		ctx:                   ctx,
443 444
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
445
		queryCoord:            node.queryCoord,
446 447
	}

448 449
	log.Debug("LoadCollection received",
		zap.String("traceID", traceID),
450
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
451 452
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
453 454 455 456 457

	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
458
			zap.String("role", typeutil.ProxyRole),
459 460 461
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

462
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
463
			metrics.AbandonLabel).Inc()
464
		return &commonpb.Status{
465
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
466 467 468
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
469

470 471
	log.Debug("LoadCollection enqueued",
		zap.String("traceID", traceID),
472
		zap.String("role", typeutil.ProxyRole),
473 474 475
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
476 477
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
478 479 480

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
481
			zap.Error(err),
482
			zap.String("traceID", traceID),
483
			zap.String("role", typeutil.ProxyRole),
484 485 486
			zap.Int64("MsgID", lct.ID()),
			zap.Uint64("BeginTS", lct.BeginTs()),
			zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
487 488 489
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

490
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
491
			metrics.TotalLabel).Inc()
492
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
493
			metrics.FailLabel).Inc()
494
		return &commonpb.Status{
495
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
496 497 498 499
			Reason:    err.Error(),
		}, nil
	}

500 501
	log.Debug("LoadCollection done",
		zap.String("traceID", traceID),
502
		zap.String("role", typeutil.ProxyRole),
503 504 505 506 507 508
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

509
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
510
		metrics.TotalLabel).Inc()
511
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
512 513
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
514
	return lct.result, nil
515 516
}

517
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
518
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
519 520 521
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
522

523
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
524 525
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
526 527
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
528

529
	rct := &releaseCollectionTask{
S
sunby 已提交
530
		ctx:                      ctx,
531 532
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
533
		queryCoord:               node.queryCoord,
534
		chMgr:                    node.chMgr,
535 536
	}

537 538
	log.Debug(
		rpcReceived(method),
539
		zap.String("traceID", traceID),
540
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
541 542
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
543 544

	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
545 546
		log.Warn(
			rpcFailedToEnqueue(method),
547 548
			zap.Error(err),
			zap.String("traceID", traceID),
549
			zap.String("role", typeutil.ProxyRole),
550 551 552
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

553
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
554
			metrics.AbandonLabel).Inc()
555
		return &commonpb.Status{
556
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
557 558 559 560
			Reason:    err.Error(),
		}, nil
	}

561 562
	log.Debug(
		rpcEnqueued(method),
563
		zap.String("traceID", traceID),
564
		zap.String("role", typeutil.ProxyRole),
565 566 567
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
568 569
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
570 571

	if err := rct.WaitToFinish(); err != nil {
572 573
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
574
			zap.Error(err),
575
			zap.String("traceID", traceID),
576
			zap.String("role", typeutil.ProxyRole),
577 578 579
			zap.Int64("MsgID", rct.ID()),
			zap.Uint64("BeginTS", rct.BeginTs()),
			zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
580 581 582
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

583
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
584
			metrics.TotalLabel).Inc()
585
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
586
			metrics.FailLabel).Inc()
587
		return &commonpb.Status{
588
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
589 590 591 592
			Reason:    err.Error(),
		}, nil
	}

593 594
	log.Debug(
		rpcDone(method),
595
		zap.String("traceID", traceID),
596
		zap.String("role", typeutil.ProxyRole),
597 598 599 600 601 602
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

603
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
604
		metrics.TotalLabel).Inc()
605
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
606 607
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
608
	return rct.result, nil
609 610
}

611
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
612
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
613 614 615 616 617
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
618

619
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
620 621
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
622 623
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
624

625
	dct := &describeCollectionTask{
S
sunby 已提交
626
		ctx:                       ctx,
627 628
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
629
		rootCoord:                 node.rootCoord,
630 631
	}

632 633
	log.Debug("DescribeCollection received",
		zap.String("traceID", traceID),
634
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
635 636
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
637 638 639 640 641

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
642
			zap.String("role", typeutil.ProxyRole),
643 644 645
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

646
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
647
			metrics.AbandonLabel).Inc()
648 649
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
650
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
651 652 653 654 655
				Reason:    err.Error(),
			},
		}, nil
	}

656 657
	log.Debug("DescribeCollection enqueued",
		zap.String("traceID", traceID),
658
		zap.String("role", typeutil.ProxyRole),
659 660 661
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
662 663
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
664 665 666

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
667
			zap.Error(err),
668
			zap.String("traceID", traceID),
669
			zap.String("role", typeutil.ProxyRole),
670 671 672
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTS", dct.BeginTs()),
			zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
673 674 675
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

676
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
677
			metrics.TotalLabel).Inc()
678
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
679
			metrics.FailLabel).Inc()
680

681 682
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
683
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
684 685 686 687 688
				Reason:    err.Error(),
			},
		}, nil
	}

689 690
	log.Debug("DescribeCollection done",
		zap.String("traceID", traceID),
691
		zap.String("role", typeutil.ProxyRole),
692 693 694 695 696 697
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

698
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
699
		metrics.TotalLabel).Inc()
700
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
701 702
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
703 704 705
	return dct.result, nil
}

706
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
707
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
708 709 710 711 712
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
713 714 715 716

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
717 718
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
719

720
	g := &getCollectionStatisticsTask{
G
godchen 已提交
721 722 723
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
724
		dataCoord:                      node.dataCoord,
725 726
	}

727 728
	log.Debug("GetCollectionStatistics received",
		zap.String("traceID", traceID),
729
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
730 731
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
732 733 734 735 736

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn("GetCollectionStatistics failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
737
			zap.String("role", typeutil.ProxyRole),
738 739 740
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

741
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
742
			metrics.AbandonLabel).Inc()
743

G
godchen 已提交
744
		return &milvuspb.GetCollectionStatisticsResponse{
745
			Status: &commonpb.Status{
746
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
747 748 749 750 751
				Reason:    err.Error(),
			},
		}, nil
	}

752 753
	log.Debug("GetCollectionStatistics enqueued",
		zap.String("traceID", traceID),
754
		zap.String("role", typeutil.ProxyRole),
755 756 757
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
758 759
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
760 761 762

	if err := g.WaitToFinish(); err != nil {
		log.Warn("GetCollectionStatistics failed to WaitToFinish",
D
dragondriver 已提交
763
			zap.Error(err),
764
			zap.String("traceID", traceID),
765
			zap.String("role", typeutil.ProxyRole),
766 767 768
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
769 770 771
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

772
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
773
			metrics.TotalLabel).Inc()
774
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
775
			metrics.FailLabel).Inc()
776

G
godchen 已提交
777
		return &milvuspb.GetCollectionStatisticsResponse{
778
			Status: &commonpb.Status{
779
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
780 781 782 783 784
				Reason:    err.Error(),
			},
		}, nil
	}

785 786
	log.Debug("GetCollectionStatistics done",
		zap.String("traceID", traceID),
787
		zap.String("role", typeutil.ProxyRole),
788 789 790 791 792 793
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

794
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
795
		metrics.TotalLabel).Inc()
796
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
797 798
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
799
	return g.result, nil
800 801
}

802
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
803
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
804 805 806 807 808
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
809 810 811 812
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()

813
	sct := &showCollectionsTask{
G
godchen 已提交
814 815 816
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
817
		queryCoord:             node.queryCoord,
818
		rootCoord:              node.rootCoord,
819 820
	}

821
	log.Debug("ShowCollections received",
822
		zap.String("role", typeutil.ProxyRole),
823 824 825 826 827 828
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
		zap.Any("CollectionNames", request.CollectionNames),
	)

829
	err := node.sched.ddQueue.Enqueue(sct)
830
	if err != nil {
831 832
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
833
			zap.String("role", typeutil.ProxyRole),
834 835 836 837 838 839
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

840
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
841
		return &milvuspb.ShowCollectionsResponse{
842
			Status: &commonpb.Status{
843
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
844 845 846 847 848
				Reason:    err.Error(),
			},
		}, nil
	}

849
	log.Debug("ShowCollections enqueued",
850
		zap.String("role", typeutil.ProxyRole),
851
		zap.Int64("MsgID", sct.ID()),
852
		zap.String("DbName", sct.ShowCollectionsRequest.DbName),
853
		zap.Uint64("TimeStamp", request.TimeStamp),
854 855 856
		zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
		zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
	)
D
dragondriver 已提交
857

858 859
	err = sct.WaitToFinish()
	if err != nil {
860 861
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
862
			zap.String("role", typeutil.ProxyRole),
863 864 865 866 867 868 869
			zap.Int64("MsgID", sct.ID()),
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

870 871
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

G
godchen 已提交
872
		return &milvuspb.ShowCollectionsResponse{
873
			Status: &commonpb.Status{
874
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
875 876 877 878 879
				Reason:    err.Error(),
			},
		}, nil
	}

880
	log.Debug("ShowCollections Done",
881
		zap.String("role", typeutil.ProxyRole),
882 883 884 885
		zap.Int64("MsgID", sct.ID()),
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
886 887
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
888

889 890
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
891 892 893
	return sct.result, nil
}

894
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
895
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
896 897 898
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
899

900
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
901 902
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
903 904 905
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
906

907
	cpt := &createPartitionTask{
S
sunby 已提交
908
		ctx:                    ctx,
909 910
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
911
		rootCoord:              node.rootCoord,
912 913 914
		result:                 nil,
	}

915 916 917
	log.Debug(
		rpcReceived("CreatePartition"),
		zap.String("traceID", traceID),
918
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
919 920 921
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
922 923 924 925 926 927

	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
			zap.Error(err),
			zap.String("traceID", traceID),
928
			zap.String("role", typeutil.ProxyRole),
929 930 931 932
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

933 934
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

935
		return &commonpb.Status{
936
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
937 938 939
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
940

941 942 943
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.String("traceID", traceID),
944
		zap.String("role", typeutil.ProxyRole),
945 946 947
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
948 949 950
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
951 952 953 954

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
955
			zap.Error(err),
956
			zap.String("traceID", traceID),
957
			zap.String("role", typeutil.ProxyRole),
958 959 960
			zap.Int64("MsgID", cpt.ID()),
			zap.Uint64("BeginTS", cpt.BeginTs()),
			zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
961 962 963 964
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

965 966
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

967
		return &commonpb.Status{
968
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
969 970 971
			Reason:    err.Error(),
		}, nil
	}
972 973 974 975

	log.Debug(
		rpcDone("CreatePartition"),
		zap.String("traceID", traceID),
976
		zap.String("role", typeutil.ProxyRole),
977 978 979 980 981 982 983
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

984 985
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
986 987 988
	return cpt.result, nil
}

989
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
990
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
991 992 993
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
994

995
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
996 997
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
998 999 1000
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
1001

1002
	dpt := &dropPartitionTask{
S
sunby 已提交
1003
		ctx:                  ctx,
1004 1005
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
1006
		rootCoord:            node.rootCoord,
1007 1008 1009
		result:               nil,
	}

1010 1011 1012
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1013
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1014 1015 1016
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1017 1018 1019 1020 1021 1022

	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1023
			zap.String("role", typeutil.ProxyRole),
1024 1025 1026 1027
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1028 1029
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

1030
		return &commonpb.Status{
1031
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1032 1033 1034
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1035

1036 1037 1038
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1039
		zap.String("role", typeutil.ProxyRole),
1040 1041 1042
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1043 1044 1045
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1046 1047 1048 1049

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1050
			zap.Error(err),
1051
			zap.String("traceID", traceID),
1052
			zap.String("role", typeutil.ProxyRole),
1053 1054 1055
			zap.Int64("MsgID", dpt.ID()),
			zap.Uint64("BeginTS", dpt.BeginTs()),
			zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1056 1057 1058 1059
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1060 1061
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

1062
		return &commonpb.Status{
1063
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1064 1065 1066
			Reason:    err.Error(),
		}, nil
	}
1067 1068 1069 1070

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1071
		zap.String("role", typeutil.ProxyRole),
1072 1073 1074 1075 1076 1077 1078
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1079 1080
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1081 1082 1083
	return dpt.result, nil
}

1084
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1085
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1086 1087 1088 1089 1090
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1091

D
dragondriver 已提交
1092
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1093 1094
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1095 1096 1097 1098
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1099
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1100

1101
	hpt := &hasPartitionTask{
S
sunby 已提交
1102
		ctx:                 ctx,
1103 1104
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1105
		rootCoord:           node.rootCoord,
1106 1107 1108
		result:              nil,
	}

D
dragondriver 已提交
1109 1110 1111
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1112
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1113 1114 1115
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1116 1117 1118 1119 1120 1121

	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1122
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1123 1124 1125 1126
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1127
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1128
			metrics.AbandonLabel).Inc()
1129

1130 1131
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1132
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1133 1134 1135 1136 1137
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1138

D
dragondriver 已提交
1139 1140 1141
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1142
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1143 1144 1145
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1146 1147 1148
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1149 1150 1151 1152

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1153
			zap.Error(err),
D
dragondriver 已提交
1154
			zap.String("traceID", traceID),
1155
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1156 1157 1158
			zap.Int64("MsgID", hpt.ID()),
			zap.Uint64("BeginTS", hpt.BeginTs()),
			zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1159 1160 1161 1162
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1163
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1164
			metrics.FailLabel).Inc()
1165

1166 1167
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1168
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1169 1170 1171 1172 1173
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1174 1175 1176 1177

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1178
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1179 1180 1181 1182 1183 1184 1185
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1186
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1187 1188
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1189 1190 1191
	return hpt.result, nil
}

1192
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1193
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1194 1195 1196
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1197

D
dragondriver 已提交
1198
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1199 1200
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1201 1202
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
1203

1204
	lpt := &loadPartitionsTask{
G
godchen 已提交
1205 1206 1207
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1208
		queryCoord:            node.queryCoord,
1209 1210
	}

1211 1212 1213
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1214
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1215 1216 1217
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1218 1219 1220 1221 1222 1223

	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1224
			zap.String("role", typeutil.ProxyRole),
1225 1226 1227 1228
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1229
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1230
			metrics.AbandonLabel).Inc()
1231

1232
		return &commonpb.Status{
1233
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1234 1235 1236 1237
			Reason:    err.Error(),
		}, nil
	}

1238 1239 1240
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1241
		zap.String("role", typeutil.ProxyRole),
1242 1243 1244
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1245 1246 1247
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1248 1249 1250 1251

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1252
			zap.Error(err),
1253
			zap.String("traceID", traceID),
1254
			zap.String("role", typeutil.ProxyRole),
1255 1256 1257
			zap.Int64("MsgID", lpt.ID()),
			zap.Uint64("BeginTS", lpt.BeginTs()),
			zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1258 1259 1260 1261
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1262
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1263
			metrics.TotalLabel).Inc()
1264
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1265
			metrics.FailLabel).Inc()
1266

1267
		return &commonpb.Status{
1268
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1269 1270 1271 1272
			Reason:    err.Error(),
		}, nil
	}

1273 1274 1275
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1276
		zap.String("role", typeutil.ProxyRole),
1277 1278 1279 1280 1281 1282 1283
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

1284
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1285
		metrics.TotalLabel).Inc()
1286
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1287 1288
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1289
	return lpt.result, nil
1290 1291
}

1292
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1293
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1294 1295 1296
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1297 1298 1299 1300 1301

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1302
	rpt := &releasePartitionsTask{
G
godchen 已提交
1303 1304 1305
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1306
		queryCoord:               node.queryCoord,
1307 1308
	}

1309
	method := "ReleasePartitions"
1310
	tr := timerecord.NewTimeRecorder(method)
1311 1312 1313 1314

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1315
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1316 1317 1318
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1319 1320 1321 1322 1323 1324

	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1325
			zap.String("role", typeutil.ProxyRole),
1326 1327 1328 1329
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1330
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1331
			metrics.AbandonLabel).Inc()
1332

1333
		return &commonpb.Status{
1334
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1335 1336 1337 1338
			Reason:    err.Error(),
		}, nil
	}

1339 1340 1341
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1342
		zap.String("role", typeutil.ProxyRole),
1343 1344 1345
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1346 1347 1348
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1349 1350 1351 1352

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1353
			zap.Error(err),
1354
			zap.String("traceID", traceID),
1355
			zap.String("role", typeutil.ProxyRole),
1356 1357 1358
			zap.Int64("msgID", rpt.Base.MsgID),
			zap.Uint64("BeginTS", rpt.BeginTs()),
			zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1359 1360 1361 1362
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1363
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1364
			metrics.TotalLabel).Inc()
1365
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1366
			metrics.FailLabel).Inc()
1367

1368
		return &commonpb.Status{
1369
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1370 1371 1372 1373
			Reason:    err.Error(),
		}, nil
	}

1374 1375 1376
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1377
		zap.String("role", typeutil.ProxyRole),
1378 1379 1380 1381 1382 1383 1384
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

1385
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1386
		metrics.TotalLabel).Inc()
1387
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1388 1389
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1390
	return rpt.result, nil
1391 1392
}

1393
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1394
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1395 1396 1397 1398 1399
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1400 1401 1402 1403 1404

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1405
	g := &getPartitionStatisticsTask{
1406 1407 1408
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1409
		dataCoord:                     node.dataCoord,
1410 1411
	}

1412
	method := "GetPartitionStatistics"
1413
	tr := timerecord.NewTimeRecorder(method)
1414 1415 1416 1417

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1418
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1419 1420 1421
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1422 1423 1424 1425 1426 1427

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1428
			zap.String("role", typeutil.ProxyRole),
1429 1430 1431 1432
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1433
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1434
			metrics.AbandonLabel).Inc()
1435

1436 1437 1438 1439 1440 1441 1442 1443
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1444 1445 1446
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1447
		zap.String("role", typeutil.ProxyRole),
1448 1449 1450
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
1451 1452 1453
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1454 1455 1456 1457

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1458
			zap.Error(err),
1459
			zap.String("traceID", traceID),
1460
			zap.String("role", typeutil.ProxyRole),
1461 1462 1463
			zap.Int64("msgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
1464 1465 1466 1467
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1468
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1469
			metrics.TotalLabel).Inc()
1470
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1471
			metrics.FailLabel).Inc()
1472

1473 1474 1475 1476 1477 1478 1479 1480
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1481 1482 1483
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1484
		zap.String("role", typeutil.ProxyRole),
1485 1486 1487 1488 1489 1490 1491
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1492
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1493
		metrics.TotalLabel).Inc()
1494
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1495 1496
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1497
	return g.result, nil
1498 1499
}

1500
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1501
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1502 1503 1504 1505 1506
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1507 1508 1509 1510 1511

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1512
	spt := &showPartitionsTask{
G
godchen 已提交
1513 1514 1515
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1516
		rootCoord:             node.rootCoord,
1517
		queryCoord:            node.queryCoord,
G
godchen 已提交
1518
		result:                nil,
1519 1520
	}

1521
	method := "ShowPartitions"
1522 1523 1524
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1525
		metrics.TotalLabel).Inc()
1526 1527 1528 1529

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1530
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1531
		zap.Any("request", request))
1532 1533 1534 1535 1536 1537

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1538
			zap.String("role", typeutil.ProxyRole),
1539 1540
			zap.Any("request", request))

1541
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1542
			metrics.AbandonLabel).Inc()
1543

G
godchen 已提交
1544
		return &milvuspb.ShowPartitionsResponse{
1545
			Status: &commonpb.Status{
1546
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1547 1548 1549 1550 1551
				Reason:    err.Error(),
			},
		}, nil
	}

1552 1553 1554
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1555
		zap.String("role", typeutil.ProxyRole),
1556 1557 1558
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1559 1560
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1561 1562 1563 1564 1565
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1566
			zap.Error(err),
1567
			zap.String("traceID", traceID),
1568
			zap.String("role", typeutil.ProxyRole),
1569 1570 1571 1572 1573 1574
			zap.Int64("msgID", spt.ID()),
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1575

1576
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1577
			metrics.FailLabel).Inc()
1578

G
godchen 已提交
1579
		return &milvuspb.ShowPartitionsResponse{
1580
			Status: &commonpb.Status{
1581
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1582 1583 1584 1585
				Reason:    err.Error(),
			},
		}, nil
	}
1586 1587 1588 1589

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1590
		zap.String("role", typeutil.ProxyRole),
1591 1592 1593 1594 1595 1596 1597
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

1598
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1599 1600
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1601 1602 1603
	return spt.result, nil
}

1604
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1605
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1606 1607 1608
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1609 1610 1611 1612 1613

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1614
	cit := &createIndexTask{
S
sunby 已提交
1615
		ctx:                ctx,
1616 1617
		Condition:          NewTaskCondition(ctx),
		CreateIndexRequest: request,
1618
		rootCoord:          node.rootCoord,
1619 1620
	}

D
dragondriver 已提交
1621
	method := "CreateIndex"
1622
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1623 1624 1625 1626

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1627
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1628 1629 1630 1631
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1632 1633 1634 1635 1636 1637

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1638
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1639 1640 1641 1642 1643
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

1644
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1645
			metrics.AbandonLabel).Inc()
1646

1647
		return &commonpb.Status{
1648
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1649 1650 1651 1652
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1653 1654 1655
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1656
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1657 1658 1659
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1660 1661 1662 1663
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1664 1665 1666 1667

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1668
			zap.Error(err),
D
dragondriver 已提交
1669
			zap.String("traceID", traceID),
1670
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1671 1672 1673
			zap.Int64("MsgID", cit.ID()),
			zap.Uint64("BeginTs", cit.BeginTs()),
			zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1674 1675 1676 1677 1678
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

1679
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1680
			metrics.TotalLabel).Inc()
1681
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1682
			metrics.FailLabel).Inc()
1683

1684
		return &commonpb.Status{
1685
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1686 1687 1688 1689
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1690 1691 1692
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1693
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1694 1695 1696 1697 1698 1699 1700 1701
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))

1702
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1703
		metrics.TotalLabel).Inc()
1704
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1705 1706
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1707 1708 1709
	return cit.result, nil
}

1710
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1711
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1712 1713 1714 1715 1716
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1717 1718 1719 1720 1721

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1722
	dit := &describeIndexTask{
S
sunby 已提交
1723
		ctx:                  ctx,
1724 1725
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1726
		rootCoord:            node.rootCoord,
1727 1728
	}

1729 1730 1731
	method := "DescribeIndex"
	// avoid data race
	indexName := request.IndexName
1732
	tr := timerecord.NewTimeRecorder(method)
1733 1734 1735 1736

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1737
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1738 1739 1740
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1741 1742 1743 1744 1745 1746 1747
		zap.String("index name", indexName))

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1748
			zap.String("role", typeutil.ProxyRole),
1749 1750 1751 1752 1753
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", indexName))

1754
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1755
			metrics.AbandonLabel).Inc()
1756

1757 1758
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1759
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1760 1761 1762 1763 1764
				Reason:    err.Error(),
			},
		}, nil
	}

1765 1766 1767
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1768
		zap.String("role", typeutil.ProxyRole),
1769 1770 1771
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1772 1773 1774
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1775 1776 1777 1778 1779
		zap.String("index name", indexName))

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1780
			zap.Error(err),
1781
			zap.String("traceID", traceID),
1782
			zap.String("role", typeutil.ProxyRole),
1783 1784 1785
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1786 1787 1788
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
1789
			zap.String("index name", indexName))
D
dragondriver 已提交
1790

Z
zhenshan.cao 已提交
1791 1792 1793 1794
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
1795
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1796
			metrics.TotalLabel).Inc()
1797
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1798
			metrics.FailLabel).Inc()
1799

1800 1801
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1802
				ErrorCode: errCode,
1803 1804 1805 1806 1807
				Reason:    err.Error(),
			},
		}, nil
	}

1808 1809 1810
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1811
		zap.String("role", typeutil.ProxyRole),
1812 1813 1814 1815 1816 1817 1818 1819
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", indexName))

1820
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1821
		metrics.TotalLabel).Inc()
1822
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1823 1824
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1825 1826 1827
	return dit.result, nil
}

1828
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1829
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1830 1831 1832
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1833 1834 1835 1836 1837

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1838
	dit := &dropIndexTask{
S
sunby 已提交
1839
		ctx:              ctx,
B
BossZou 已提交
1840 1841
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1842
		rootCoord:        node.rootCoord,
B
BossZou 已提交
1843
	}
G
godchen 已提交
1844

D
dragondriver 已提交
1845
	method := "DropIndex"
1846
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1847 1848 1849 1850

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1851
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1852 1853 1854 1855 1856
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

D
dragondriver 已提交
1857 1858 1859 1860 1861
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1862
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1863 1864 1865 1866
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
1867
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1868
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1869

B
BossZou 已提交
1870
		return &commonpb.Status{
1871
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1872 1873 1874
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1875

D
dragondriver 已提交
1876 1877 1878
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1879
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1880 1881 1882
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1883 1884 1885 1886
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
D
dragondriver 已提交
1887 1888 1889 1890

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1891
			zap.Error(err),
D
dragondriver 已提交
1892
			zap.String("traceID", traceID),
1893
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1894 1895 1896
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1897 1898 1899 1900 1901
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

1902
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1903
			metrics.TotalLabel).Inc()
1904
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1905
			metrics.FailLabel).Inc()
1906

B
BossZou 已提交
1907
		return &commonpb.Status{
1908
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1909 1910 1911
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1912 1913 1914 1915

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1916
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1917 1918 1919 1920 1921 1922 1923 1924
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1925
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1926
		metrics.TotalLabel).Inc()
1927
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1928 1929
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1930 1931 1932
	return dit.result, nil
}

1933 1934
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
C
Cai Yudong 已提交
1935
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1936 1937 1938 1939 1940
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1941 1942 1943 1944 1945

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1946
	gibpt := &getIndexBuildProgressTask{
1947 1948 1949
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1950 1951
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1952
		dataCoord:                    node.dataCoord,
1953 1954
	}

1955
	method := "GetIndexBuildProgress"
1956
	tr := timerecord.NewTimeRecorder(method)
1957 1958 1959 1960

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1961
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1962 1963 1964 1965
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1966 1967 1968 1969 1970 1971

	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1972
			zap.String("role", typeutil.ProxyRole),
1973 1974 1975 1976
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
1977
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1978
			metrics.AbandonLabel).Inc()
1979

1980 1981 1982 1983 1984 1985 1986 1987
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1988 1989 1990
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1991
		zap.String("role", typeutil.ProxyRole),
1992 1993 1994
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
1995 1996 1997 1998
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1999 2000 2001 2002

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
2003
			zap.Error(err),
2004
			zap.String("traceID", traceID),
2005
			zap.String("role", typeutil.ProxyRole),
2006 2007 2008
			zap.Int64("MsgID", gibpt.ID()),
			zap.Uint64("BeginTs", gibpt.BeginTs()),
			zap.Uint64("EndTs", gibpt.EndTs()),
2009 2010 2011 2012
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
2013
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2014
			metrics.TotalLabel).Inc()
2015
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2016
			metrics.FailLabel).Inc()
2017 2018 2019 2020 2021 2022 2023 2024

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2025 2026 2027 2028

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2029
		zap.String("role", typeutil.ProxyRole),
2030 2031 2032 2033 2034 2035 2036 2037
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName),
		zap.Any("result", gibpt.result))
2038

2039
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2040
		metrics.TotalLabel).Inc()
2041
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2042 2043
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2044
	return gibpt.result, nil
2045 2046
}

2047
// GetIndexState get the build-state of index.
C
Cai Yudong 已提交
2048
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
2049 2050 2051 2052 2053
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2054 2055 2056 2057 2058

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2059
	dipt := &getIndexStateTask{
G
godchen 已提交
2060 2061 2062
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
2063 2064
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
2065 2066
	}

2067
	method := "GetIndexState"
2068
	tr := timerecord.NewTimeRecorder(method)
2069 2070 2071 2072

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2073
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2074 2075 2076 2077
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2078 2079 2080 2081 2082 2083

	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2084
			zap.String("role", typeutil.ProxyRole),
2085 2086 2087 2088 2089
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

2090
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2091
			metrics.AbandonLabel).Inc()
2092

G
godchen 已提交
2093
		return &milvuspb.GetIndexStateResponse{
2094
			Status: &commonpb.Status{
2095
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2096 2097 2098 2099 2100
				Reason:    err.Error(),
			},
		}, nil
	}

2101 2102 2103
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2104
		zap.String("role", typeutil.ProxyRole),
2105 2106 2107
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2108 2109 2110 2111
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2112 2113 2114 2115

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2116
			zap.Error(err),
2117
			zap.String("traceID", traceID),
2118
			zap.String("role", typeutil.ProxyRole),
2119 2120 2121
			zap.Int64("MsgID", dipt.ID()),
			zap.Uint64("BeginTs", dipt.BeginTs()),
			zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2122 2123 2124 2125 2126
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

2127
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2128
			metrics.TotalLabel).Inc()
2129
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2130
			metrics.FailLabel).Inc()
2131

G
godchen 已提交
2132
		return &milvuspb.GetIndexStateResponse{
2133
			Status: &commonpb.Status{
2134
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2135 2136 2137 2138 2139
				Reason:    err.Error(),
			},
		}, nil
	}

2140 2141 2142
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2143
		zap.String("role", typeutil.ProxyRole),
2144 2145 2146 2147 2148 2149 2150 2151
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

2152
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2153
		metrics.TotalLabel).Inc()
2154
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2155 2156
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2157 2158 2159
	return dipt.result, nil
}

2160
// Insert insert records into collection.
C
Cai Yudong 已提交
2161
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2162 2163 2164 2165 2166 2167
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))

2168 2169 2170 2171 2172
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2173 2174
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2175

2176
	it := &insertTask{
2177 2178
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2179
		// req:       request,
2180 2181 2182 2183
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2184
			InsertRequest: internalpb.InsertRequest{
2185
				Base: &commonpb.MsgBase{
X
xige-16 已提交
2186 2187 2188
					MsgType:  commonpb.MsgType_Insert,
					MsgID:    0,
					SourceID: Params.ProxyCfg.ProxyID,
2189 2190 2191
				},
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2192 2193 2194
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2195
				// RowData: transfer column based request to this
2196 2197
			},
		},
2198
		rowIDAllocator: node.idAllocator,
2199
		segIDAssigner:  node.segAssigner,
2200
		chMgr:          node.chMgr,
2201
		chTicker:       node.chTicker,
2202
	}
2203 2204

	if len(it.PartitionName) <= 0 {
2205
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2206 2207
	}

X
Xiangyu Wang 已提交
2208
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2209
		numRows := request.NumRows
2210 2211 2212 2213
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2214

X
Xiangyu Wang 已提交
2215 2216 2217 2218 2219 2220 2221
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2222 2223
	}

X
Xiangyu Wang 已提交
2224
	log.Debug("Enqueue insert request in Proxy",
2225
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2226 2227 2228 2229 2230
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2231 2232
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))
D
dragondriver 已提交
2233

X
Xiangyu Wang 已提交
2234 2235
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Debug("Failed to enqueue insert task: " + err.Error())
2236
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2237
		return constructFailedResponse(err), nil
2238
	}
D
dragondriver 已提交
2239

X
Xiangyu Wang 已提交
2240
	log.Debug("Detail of insert request in Proxy",
2241
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2242
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2243 2244 2245 2246 2247
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
X
Xiangyu Wang 已提交
2248 2249 2250 2251 2252
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))

	if err := it.WaitToFinish(); err != nil {
		log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2253
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2254
			metrics.TotalLabel).Inc()
2255
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2256
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2257 2258 2259 2260 2261
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2262
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2274
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2275

2276
	metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2277 2278 2279
		metrics.SuccessLabel).Inc()
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Add(float64(it.result.InsertCnt))
	metrics.ProxyInsertLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
2280 2281 2282
	return it.result, nil
}

2283
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2284
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2285 2286 2287
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2288 2289
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
2290

G
groot 已提交
2291 2292 2293 2294 2295 2296
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2297 2298 2299
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

C
Cai Yudong 已提交
2300 2301 2302 2303 2304 2305 2306
	deleteReq := &milvuspb.DeleteRequest{
		DbName:         request.DbName,
		CollectionName: request.CollectionName,
		PartitionName:  request.PartitionName,
		Expr:           request.Expr,
	}

2307
	dt := &deleteTask{
C
Cai Yudong 已提交
2308 2309 2310
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		req:       deleteReq,
G
godchen 已提交
2311
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2312 2313 2314
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2315 2316 2317 2318 2319 2320 2321 2322
			DeleteRequest: internalpb.DeleteRequest{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_Delete,
					MsgID:   0,
				},
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2323 2324 2325 2326
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2327 2328
	}

2329
	log.Debug("Enqueue delete request in Proxy",
2330
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2331 2332 2333 2334
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2335 2336 2337 2338

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
		log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
2339
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2340
			metrics.FailLabel).Inc()
2341

G
groot 已提交
2342 2343 2344 2345 2346 2347 2348 2349
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2350
	log.Debug("Detail of delete request in Proxy",
2351
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2352 2353 2354 2355 2356
		zap.Int64("msgID", dt.Base.MsgID),
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2357 2358
		zap.String("expr", request.Expr),
		zap.String("traceID", traceID))
G
groot 已提交
2359

2360 2361
	if err := dt.WaitToFinish(); err != nil {
		log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2362
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2363
			metrics.TotalLabel).Inc()
2364
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2365
			metrics.FailLabel).Inc()
G
groot 已提交
2366 2367 2368 2369 2370 2371 2372 2373
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2374
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2375
		metrics.TotalLabel).Inc()
2376
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2377 2378
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2379 2380 2381
	return dt.result, nil
}

2382
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2383
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2384 2385 2386 2387 2388
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2389 2390
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2391

C
cai.zhang 已提交
2392 2393
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2394 2395
	traceID, _, _ := trace.InfoFromSpan(sp)

2396
	qt := &searchTask{
S
sunby 已提交
2397
		ctx:       ctx,
2398
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2399
		SearchRequest: &internalpb.SearchRequest{
2400
			Base: &commonpb.MsgBase{
2401
				MsgType:  commonpb.MsgType_Search,
2402
				SourceID: Params.ProxyCfg.ProxyID,
2403
			},
2404
			ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2405
		},
2406
		resultBuf: make(chan []*internalpb.SearchResults, 1),
2407 2408
		query:     request,
		chMgr:     node.chMgr,
2409
		qc:        node.queryCoord,
2410
		tr:        timerecord.NewTimeRecorder("search"),
2411 2412
	}

2413 2414 2415 2416 2417
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

	log.Debug(
		rpcReceived(method),
D
dragondriver 已提交
2418
		zap.String("traceID", traceID),
2419
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2420 2421 2422 2423 2424
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2425 2426 2427 2428
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2429

2430 2431 2432
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
D
dragondriver 已提交
2433 2434
			zap.Error(err),
			zap.String("traceID", traceID),
2435
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2436 2437 2438 2439 2440 2441
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
			zap.Any("OutputFields", request.OutputFields),
2442 2443 2444
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2445

2446
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2447
			metrics.SearchLabel, metrics.AbandonLabel).Inc()
2448

2449 2450
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2451
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2452 2453 2454 2455 2456
				Reason:    err.Error(),
			},
		}, nil
	}

2457 2458
	log.Debug(
		rpcEnqueued(method),
D
dragondriver 已提交
2459
		zap.String("traceID", traceID),
2460
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2461
		zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2462 2463 2464 2465 2466
		zap.Uint64("timestamp", qt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
2467
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2468 2469 2470 2471
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2472

2473 2474 2475
	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2476
			zap.Error(err),
D
dragondriver 已提交
2477
			zap.String("traceID", traceID),
2478
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2479
			zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2480 2481 2482 2483
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
2484
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2485 2486 2487 2488
			zap.Any("OutputFields", request.OutputFields),
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
2489
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2490
			metrics.SearchLabel, metrics.TotalLabel).Inc()
2491

2492 2493
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
			metrics.SearchLabel, metrics.FailLabel).Inc()
2494

2495 2496
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2497
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2498 2499 2500 2501 2502
				Reason:    err.Error(),
			},
		}, nil
	}

2503 2504
	log.Debug(
		rpcDone(method),
D
dragondriver 已提交
2505
		zap.String("traceID", traceID),
2506
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2507 2508 2509 2510 2511 2512
		zap.Int64("msgID", qt.ID()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2513 2514 2515 2516
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2517

2518
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2519
		metrics.SearchLabel, metrics.TotalLabel).Inc()
2520
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2521
		metrics.SearchLabel, metrics.SuccessLabel).Inc()
2522
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2523
		metrics.SearchLabel).Set(float64(qt.result.Results.NumQueries))
C
cai.zhang 已提交
2524
	searchDur := tr.ElapseSpan().Milliseconds()
2525
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2526 2527
		metrics.SearchLabel).Observe(float64(searchDur))
	metrics.ProxySearchLatencyPerNQ.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(searchDur) / float64(qt.result.Results.NumQueries))
2528 2529 2530
	return qt.result, nil
}

2531
// Flush notify data nodes to persist the data of collection.
2532 2533 2534 2535 2536 2537 2538
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2539
	if !node.checkHealthy() {
2540 2541
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2542
	}
D
dragondriver 已提交
2543 2544 2545 2546 2547

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2548
	ft := &flushTask{
T
ThreadDao 已提交
2549 2550 2551
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2552
		dataCoord:    node.dataCoord,
2553 2554
	}

D
dragondriver 已提交
2555
	method := "Flush"
2556 2557
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2558 2559 2560 2561

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2562
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2563 2564
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2565 2566 2567 2568 2569 2570

	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2571
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2572 2573 2574
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

2575 2576
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

2577 2578
		resp.Status.Reason = err.Error()
		return resp, nil
2579 2580
	}

D
dragondriver 已提交
2581 2582 2583
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2584
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2585 2586 2587
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2588 2589
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2590 2591 2592 2593

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2594
			zap.Error(err),
D
dragondriver 已提交
2595
			zap.String("traceID", traceID),
2596
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2597 2598 2599
			zap.Int64("MsgID", ft.ID()),
			zap.Uint64("BeginTs", ft.BeginTs()),
			zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2600 2601 2602
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

2603 2604
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

D
dragondriver 已提交
2605
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2606 2607
		resp.Status.Reason = err.Error()
		return resp, nil
2608 2609
	}

D
dragondriver 已提交
2610 2611 2612
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2613
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2614 2615 2616 2617 2618 2619
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))

2620 2621
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2622
	return ft.result, nil
2623 2624
}

2625
// Query get the records by primary keys.
C
Cai Yudong 已提交
2626
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2627 2628 2629 2630 2631
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2632

D
dragondriver 已提交
2633 2634 2635
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2636
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2637

2638
	qt := &queryTask{
2639 2640 2641 2642 2643
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_Retrieve,
2644
				SourceID: Params.ProxyCfg.ProxyID,
2645
			},
2646
			ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2647 2648
		},
		resultBuf: make(chan []*internalpb.RetrieveResults),
2649
		query:     request,
2650 2651
		chMgr:     node.chMgr,
		qc:        node.queryCoord,
2652 2653
	}

D
dragondriver 已提交
2654 2655 2656 2657 2658
	method := "Query"

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2659
		zap.String("role", typeutil.ProxyRole),
2660 2661 2662
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
G
godchen 已提交
2663

D
dragondriver 已提交
2664 2665 2666 2667 2668 2669
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
2670 2671 2672
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2673

2674
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2675
			metrics.QueryLabel, metrics.FailLabel).Inc()
2676 2677 2678 2679 2680 2681
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2682 2683
	}

D
dragondriver 已提交
2684 2685 2686
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2687
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2688 2689 2690
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2691 2692 2693
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2694 2695 2696 2697 2698 2699

	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2700
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2701 2702 2703
			zap.Int64("MsgID", qt.ID()),
			zap.Uint64("BeginTs", qt.BeginTs()),
			zap.Uint64("EndTs", qt.EndTs()),
2704 2705 2706
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
2707

2708
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2709
			metrics.QueryLabel, metrics.TotalLabel).Inc()
2710
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2711
			metrics.QueryLabel, metrics.FailLabel).Inc()
2712

2713 2714 2715 2716 2717 2718 2719
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2720

D
dragondriver 已提交
2721 2722 2723 2724 2725 2726 2727
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2728 2729 2730
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2731

2732
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2733
		metrics.QueryLabel, metrics.TotalLabel).Inc()
2734
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2735
		metrics.QueryLabel, metrics.SuccessLabel).Inc()
2736
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2737
		metrics.QueryLabel).Set(float64(len(qt.result.FieldsData)))
2738
	metrics.ProxySendMessageLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2739
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2740 2741 2742 2743 2744
	return &milvuspb.QueryResults{
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
	}, nil
}
2745

2746
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2747 2748 2749 2750
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2751 2752 2753 2754 2755

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2756 2757 2758 2759 2760 2761 2762
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2763
	method := "CreateAlias"
2764 2765
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

2785 2786
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

Y
Yusup 已提交
2787 2788 2789 2790 2791 2792
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2793 2794 2795
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2796
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2797 2798 2799 2800
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2801 2802
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2803 2804 2805 2806

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2807
			zap.Error(err),
D
dragondriver 已提交
2808
			zap.String("traceID", traceID),
2809
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2810 2811 2812 2813
			zap.Int64("MsgID", cat.ID()),
			zap.Uint64("BeginTs", cat.BeginTs()),
			zap.Uint64("EndTs", cat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2814 2815
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
2816
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2817 2818 2819 2820 2821 2822 2823

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2835 2836
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2837 2838 2839
	return cat.result, nil
}

2840
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2841 2842 2843 2844
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2845 2846 2847 2848 2849

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2850 2851 2852 2853 2854 2855 2856
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2857
	method := "DropAlias"
2858 2859
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias))
2876
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2877

Y
Yusup 已提交
2878 2879 2880 2881 2882 2883
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2884 2885 2886
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2887
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2888 2889 2890 2891
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2892
		zap.String("alias", request.Alias))
D
dragondriver 已提交
2893 2894 2895 2896

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2897
			zap.Error(err),
D
dragondriver 已提交
2898
			zap.String("traceID", traceID),
2899
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2900 2901 2902 2903
			zap.Int64("MsgID", dat.ID()),
			zap.Uint64("BeginTs", dat.BeginTs()),
			zap.Uint64("EndTs", dat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2904 2905
			zap.String("alias", request.Alias))

2906 2907
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

Y
Yusup 已提交
2908 2909 2910 2911 2912 2913
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2914 2915 2916 2917 2918 2919 2920 2921 2922 2923
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2924 2925
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2926 2927 2928
	return dat.result, nil
}

2929
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2930 2931 2932 2933
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2934 2935 2936 2937 2938

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2939 2940 2941 2942 2943 2944 2945
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2946
	method := "AlterAlias"
2947 2948
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
2967
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2968

Y
Yusup 已提交
2969 2970 2971 2972 2973 2974
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2975 2976 2977
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2978
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2979 2980 2981 2982
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2983 2984
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2985 2986 2987 2988

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2989
			zap.Error(err),
D
dragondriver 已提交
2990
			zap.String("traceID", traceID),
2991
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2992 2993 2994 2995
			zap.Int64("MsgID", aat.ID()),
			zap.Uint64("BeginTs", aat.BeginTs()),
			zap.Uint64("EndTs", aat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2996 2997 2998
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

2999 3000
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

Y
Yusup 已提交
3001 3002 3003 3004 3005 3006
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

3018 3019
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3020 3021 3022
	return aat.result, nil
}

3023
// CalcDistance calculates the distances between vectors.
3024
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
3025 3026 3027 3028 3029
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
3030
	param, _ := funcutil.GetAttrByKeyFromRepeatedKV("metric", request.GetParams())
3031 3032 3033 3034 3035 3036 3037 3038
	metric, err := distance.ValidateMetricType(param)
	if err != nil {
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
3039 3040
	}

3041 3042 3043 3044
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

3045 3046
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
3047

3048 3049 3050 3051 3052
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
3053 3054
		}

3055
		qt := &queryTask{
3056 3057 3058 3059 3060
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Retrieve,
3061
					SourceID: Params.ProxyCfg.ProxyID,
3062
				},
3063
				ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
3064
			},
3065
			resultBuf: make(chan []*internalpb.RetrieveResults),
3066
			query:     queryRequest,
3067
			chMgr:     node.chMgr,
3068
			qc:        node.queryCoord,
Y
yukun 已提交
3069
			ids:       ids.IdArray,
3070 3071
		}

3072
		err := node.sched.dqQueue.Enqueue(qt)
3073
		if err != nil {
3074 3075 3076
			log.Debug("CalcDistance queryTask failed to enqueue",
				zap.Error(err),
				zap.String("traceID", traceID),
3077
				zap.String("role", typeutil.ProxyRole),
3078 3079 3080 3081
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames))

3082 3083 3084 3085 3086
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3087
			}, err
3088
		}
3089 3090 3091

		log.Debug("CalcDistance queryTask enqueued",
			zap.String("traceID", traceID),
3092
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3093 3094 3095 3096 3097 3098
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3099 3100 3101 3102

		err = qt.WaitToFinish()
		if err != nil {
			log.Debug("CalcDistance queryTask failed to WaitToFinish",
G
godchen 已提交
3103
				zap.Error(err),
3104
				zap.String("traceID", traceID),
3105
				zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3106 3107 3108 3109 3110 3111
				zap.Int64("msgID", qt.Base.MsgID),
				zap.Uint64("timestamp", qt.Base.Timestamp),
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames),
				zap.Any("OutputFields", queryRequest.OutputFields))
3112 3113 3114 3115 3116 3117

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3118
			}, err
3119
		}
3120 3121 3122

		log.Debug("CalcDistance queryTask Done",
			zap.String("traceID", traceID),
3123
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3124 3125 3126 3127 3128 3129
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3130 3131

		return &milvuspb.QueryResults{
3132 3133
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
3134 3135 3136
		}, nil
	}

3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150
	// the vectors retrieved are random order, we need re-arrange the vectors by the order of input ids
	arrangeFunc := func(ids *milvuspb.VectorIDs, retrievedFields []*schemapb.FieldData) (*schemapb.VectorField, error) {
		var retrievedIds *schemapb.ScalarField
		var retrievedVectors *schemapb.VectorField
		for _, fieldData := range retrievedFields {
			if fieldData.FieldName == ids.FieldName {
				retrievedVectors = fieldData.GetVectors()
			}
			if fieldData.Type == schemapb.DataType_Int64 {
				retrievedIds = fieldData.GetScalars()
			}
		}

		if retrievedIds == nil || retrievedVectors == nil {
3151
			return nil, errors.New("failed to fetch vectors")
3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167
		}

		dict := make(map[int64]int)
		for index, id := range retrievedIds.GetLongData().Data {
			dict[id] = index
		}

		inputIds := ids.IdArray.GetIntId().Data
		if retrievedVectors.GetFloatVector() != nil {
			floatArr := retrievedVectors.GetFloatVector().Data
			element := retrievedVectors.GetDim()
			result := make([]float32, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3168
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194
				}
				result = append(result, floatArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element,
				Data: &schemapb.VectorField_FloatVector{
					FloatVector: &schemapb.FloatArray{
						Data: result,
					},
				},
			}, nil
		}

		if retrievedVectors.GetBinaryVector() != nil {
			binaryArr := retrievedVectors.GetBinaryVector()
			element := retrievedVectors.GetDim()
			if element%8 != 0 {
				element = element + 8 - element%8
			}

			result := make([]byte, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3195
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207
				}
				result = append(result, binaryArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element * 8,
				Data: &schemapb.VectorField_BinaryVector{
					BinaryVector: result,
				},
			}, nil
		}

3208
		return nil, errors.New("failed to fetch vectors")
3209 3210
	}

3211 3212
	log.Debug("CalcDistance received",
		zap.String("traceID", traceID),
3213
		zap.String("role", typeutil.ProxyRole),
3214
		zap.String("metric", metric))
G
godchen 已提交
3215

3216 3217 3218
	vectorsLeft := request.GetOpLeft().GetDataArray()
	opLeft := request.GetOpLeft().GetIdArray()
	if opLeft != nil {
3219 3220
		log.Debug("OpLeft IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3221
			zap.String("role", typeutil.ProxyRole))
3222

3223
		result, err := query(opLeft)
3224
		if err != nil {
3225 3226 3227
			log.Debug("Failed to get left vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3228
				zap.String("role", typeutil.ProxyRole))
3229

3230 3231 3232 3233 3234 3235 3236 3237
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3238 3239
		log.Debug("OpLeft IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3240
			zap.String("role", typeutil.ProxyRole))
3241

3242 3243
		vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData)
		if err != nil {
3244 3245 3246
			log.Debug("Failed to re-arrange left vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3247
				zap.String("role", typeutil.ProxyRole))
3248

3249 3250 3251 3252 3253 3254
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3255
		}
3256 3257 3258

		log.Debug("Re-arrange left vectors done",
			zap.String("traceID", traceID),
3259
			zap.String("role", typeutil.ProxyRole))
3260 3261
	}

G
groot 已提交
3262
	if vectorsLeft == nil {
3263 3264 3265
		msg := "Left vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3266
			zap.String("role", typeutil.ProxyRole))
3267

G
groot 已提交
3268 3269 3270
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3271
				Reason:    msg,
G
groot 已提交
3272 3273 3274 3275
			},
		}, nil
	}

3276 3277 3278
	vectorsRight := request.GetOpRight().GetDataArray()
	opRight := request.GetOpRight().GetIdArray()
	if opRight != nil {
3279 3280
		log.Debug("OpRight IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3281
			zap.String("role", typeutil.ProxyRole))
3282

3283
		result, err := query(opRight)
3284
		if err != nil {
3285 3286 3287
			log.Debug("Failed to get right vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3288
				zap.String("role", typeutil.ProxyRole))
3289

3290 3291 3292 3293 3294 3295 3296 3297
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3298 3299
		log.Debug("OpRight IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3300
			zap.String("role", typeutil.ProxyRole))
3301

3302 3303
		vectorsRight, err = arrangeFunc(opRight, result.FieldsData)
		if err != nil {
3304 3305 3306
			log.Debug("Failed to re-arrange right vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3307
				zap.String("role", typeutil.ProxyRole))
3308

3309 3310 3311 3312 3313 3314
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3315
		}
3316 3317 3318

		log.Debug("Re-arrange right vectors done",
			zap.String("traceID", traceID),
3319
			zap.String("role", typeutil.ProxyRole))
3320 3321
	}

G
groot 已提交
3322
	if vectorsRight == nil {
3323 3324 3325
		msg := "Right vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3326
			zap.String("role", typeutil.ProxyRole))
3327

G
groot 已提交
3328 3329 3330
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3331
				Reason:    msg,
G
groot 已提交
3332 3333 3334 3335
			},
		}, nil
	}

3336
	if vectorsLeft.Dim != vectorsRight.Dim {
3337 3338 3339
		msg := "Vectors dimension is not equal"
		log.Debug(msg,
			zap.String("traceID", traceID),
3340
			zap.String("role", typeutil.ProxyRole))
3341

3342 3343 3344
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3345
				Reason:    msg,
3346 3347 3348 3349 3350
			},
		}, nil
	}

	if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil {
3351 3352
		distances, err := distance.CalcFloatDistance(vectorsLeft.Dim, vectorsLeft.GetFloatVector().Data, vectorsRight.GetFloatVector().Data, metric)
		if err != nil {
3353 3354 3355
			log.Debug("Failed to CalcFloatDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3356
				zap.String("role", typeutil.ProxyRole))
3357

3358 3359 3360 3361 3362 3363 3364 3365
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3366 3367 3368
		log.Debug("CalcFloatDistance done",
			zap.Error(err),
			zap.String("traceID", traceID),
3369
			zap.String("role", typeutil.ProxyRole))
3370

3371 3372 3373 3374 3375 3376 3377 3378 3379 3380
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
			Array: &milvuspb.CalcDistanceResults_FloatDist{
				FloatDist: &schemapb.FloatArray{
					Data: distances,
				},
			},
		}, nil
	}

3381
	if vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetBinaryVector() != nil {
G
groot 已提交
3382
		hamming, err := distance.CalcHammingDistance(vectorsLeft.Dim, vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector())
3383
		if err != nil {
3384 3385 3386
			log.Debug("Failed to CalcHammingDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3387
				zap.String("role", typeutil.ProxyRole))
3388

3389 3390 3391 3392 3393 3394 3395 3396 3397
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		if metric == distance.HAMMING {
3398 3399
			log.Debug("CalcHammingDistance done",
				zap.String("traceID", traceID),
3400
				zap.String("role", typeutil.ProxyRole))
3401

3402 3403 3404 3405
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_IntDist{
					IntDist: &schemapb.IntArray{
G
groot 已提交
3406
						Data: hamming,
3407 3408 3409 3410 3411 3412
					},
				},
			}, nil
		}

		if metric == distance.TANIMOTO {
G
groot 已提交
3413
			tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.Dim, hamming)
3414
			if err != nil {
3415 3416 3417
				log.Debug("Failed to CalcTanimotoCoefficient",
					zap.Error(err),
					zap.String("traceID", traceID),
3418
					zap.String("role", typeutil.ProxyRole))
3419

3420 3421 3422 3423 3424 3425 3426 3427
				return &milvuspb.CalcDistanceResults{
					Status: &commonpb.Status{
						ErrorCode: commonpb.ErrorCode_UnexpectedError,
						Reason:    err.Error(),
					},
				}, nil
			}

3428 3429
			log.Debug("CalcTanimotoCoefficient done",
				zap.String("traceID", traceID),
3430
				zap.String("role", typeutil.ProxyRole))
3431

3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_FloatDist{
					FloatDist: &schemapb.FloatArray{
						Data: tanimoto,
					},
				},
			}, nil
		}
	}

3443
	err = errors.New("unexpected error")
3444
	if (vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetFloatVector() != nil) || (vectorsLeft.GetFloatVector() != nil && vectorsRight.GetBinaryVector() != nil) {
3445
		err = errors.New("cannot calculate distance between binary vectors and float vectors")
3446 3447
	}

3448 3449 3450
	log.Debug("Failed to CalcDistance",
		zap.Error(err),
		zap.String("traceID", traceID),
3451
		zap.String("role", typeutil.ProxyRole))
3452

3453 3454 3455 3456 3457 3458 3459 3460
	return &milvuspb.CalcDistanceResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		},
	}, nil
}

3461
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
3462
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
3463 3464
	panic("implement me")
}
X
XuanYang-cn 已提交
3465

3466
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
3467
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
D
dragondriver 已提交
3468
	log.Debug("GetPersistentSegmentInfo",
3469
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3470 3471 3472
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3473
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
3474
		Status: &commonpb.Status{
3475
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
3476 3477
		},
	}
3478 3479 3480 3481
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3482 3483 3484
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
3485
		metrics.TotalLabel).Inc()
G
godchen 已提交
3486
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
X
XuanYang-cn 已提交
3487
	if err != nil {
3488
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3489 3490
		return resp, nil
	}
3491
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
X
XuanYang-cn 已提交
3492
		Base: &commonpb.MsgBase{
3493
			MsgType:   commonpb.MsgType_SegmentInfo,
X
XuanYang-cn 已提交
3494 3495
			MsgID:     0,
			Timestamp: 0,
3496
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3497 3498 3499 3500
		},
		SegmentIDs: segments,
	})
	if err != nil {
3501
		log.Debug("GetPersistentSegmentInfo fail", zap.Error(err))
3502
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3503 3504
		return resp, nil
	}
3505
	log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status))
3506
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3507 3508 3509 3510 3511 3512
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3513
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3514 3515
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3516
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3517 3518 3519
			State:        info.State,
		}
	}
3520
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
3521 3522
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3523
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3524 3525 3526 3527
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3528
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3529
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
D
dragondriver 已提交
3530
	log.Debug("GetQuerySegmentInfo",
3531
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3532 3533 3534
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3535
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3536
		Status: &commonpb.Status{
3537
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3538 3539
		},
	}
3540 3541 3542 3543
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
G
godchen 已提交
3544
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
Z
zhenshan.cao 已提交
3545 3546 3547 3548
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3549 3550 3551 3552 3553
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3554
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Z
zhenshan.cao 已提交
3555
		Base: &commonpb.MsgBase{
3556
			MsgType:   commonpb.MsgType_SegmentInfo,
Z
zhenshan.cao 已提交
3557 3558
			MsgID:     0,
			Timestamp: 0,
3559
			SourceID:  Params.ProxyCfg.ProxyID,
Z
zhenshan.cao 已提交
3560
		},
3561 3562
		CollectionID: collID,
		SegmentIDs:   segments,
Z
zhenshan.cao 已提交
3563 3564
	})
	if err != nil {
3565
		log.Error("Failed to get segment info from QueryCoord",
3566
			zap.Int64s("segmentIDs", segments), zap.Error(err))
Z
zhenshan.cao 已提交
3567 3568 3569
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3570
	log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
3571
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
3572
		log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
3586
			NodeID:       info.NodeID,
X
xige-16 已提交
3587
			State:        info.SegmentState,
Z
zhenshan.cao 已提交
3588 3589
		}
	}
3590
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3591 3592 3593 3594
	resp.Infos = queryInfos
	return resp, nil
}

C
Cai Yudong 已提交
3595
func (node *Proxy) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
3596
	describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
X
XuanYang-cn 已提交
3597
		Base: &commonpb.MsgBase{
3598
			MsgType:   commonpb.MsgType_DescribeCollection,
X
XuanYang-cn 已提交
3599 3600
			MsgID:     0,
			Timestamp: 0,
3601
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3602 3603 3604 3605 3606 3607 3608
		},
		DbName:         dbName,
		CollectionName: collectionName,
	})
	if err != nil {
		return nil, err
	}
3609
	if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3610 3611 3612
		return nil, errors.New(describeCollectionResponse.Status.Reason)
	}
	collectionID := describeCollectionResponse.CollectionID
3613
	showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
X
XuanYang-cn 已提交
3614
		Base: &commonpb.MsgBase{
3615
			MsgType:   commonpb.MsgType_ShowPartitions,
X
XuanYang-cn 已提交
3616 3617
			MsgID:     0,
			Timestamp: 0,
3618
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3619 3620 3621 3622 3623 3624 3625 3626
		},
		DbName:         dbName,
		CollectionName: collectionName,
		CollectionID:   collectionID,
	})
	if err != nil {
		return nil, err
	}
3627
	if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3628 3629 3630 3631 3632
		return nil, errors.New(showPartitionsResp.Status.Reason)
	}

	ret := make([]UniqueID, 0)
	for _, partitionID := range showPartitionsResp.PartitionIDs {
3633
		showSegmentResponse, err := node.rootCoord.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
X
XuanYang-cn 已提交
3634
			Base: &commonpb.MsgBase{
3635
				MsgType:   commonpb.MsgType_ShowSegments,
X
XuanYang-cn 已提交
3636 3637
				MsgID:     0,
				Timestamp: 0,
3638
				SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3639 3640 3641 3642 3643 3644 3645
			},
			CollectionID: collectionID,
			PartitionID:  partitionID,
		})
		if err != nil {
			return nil, err
		}
3646
		if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3647 3648 3649 3650 3651 3652
			return nil, errors.New(showSegmentResponse.Status.Reason)
		}
		ret = append(ret, showSegmentResponse.SegmentIDs...)
	}
	return ret, nil
}
3653

J
jingkl 已提交
3654
// Dummy handles dummy request
C
Cai Yudong 已提交
3655
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
	if err != nil {
		log.Debug("Failed to parse dummy request type")
		return failedResponse, nil
	}

3667 3668
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3669
		if err != nil {
3670
			log.Debug("Failed to parse dummy query request")
3671 3672 3673
			return failedResponse, nil
		}

3674
		request := &milvuspb.QueryRequest{
3675 3676 3677
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3678
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3679 3680
		}

3681
		_, err = node.Query(ctx, request)
3682
		if err != nil {
3683
			log.Debug("Failed to execute dummy query")
3684 3685
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3686 3687 3688 3689 3690 3691

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3692 3693
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3694 3695
}

J
jingkl 已提交
3696
// RegisterLink registers a link
C
Cai Yudong 已提交
3697
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
G
godchen 已提交
3698
	code := node.stateCode.Load().(internalpb.StateCode)
D
dragondriver 已提交
3699
	log.Debug("RegisterLink",
3700
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3701
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3702

G
godchen 已提交
3703
	if code != internalpb.StateCode_Healthy {
3704 3705 3706
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3707
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3708
				Reason:    "proxy not healthy",
3709 3710 3711
			},
		}, nil
	}
3712
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Inc()
3713 3714 3715
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3716
			ErrorCode: commonpb.ErrorCode_Success,
3717
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3718 3719 3720
		},
	}, nil
}
3721

3722 3723 3724
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("Proxy.GetMetrics",
3725
		zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3726 3727 3728 3729
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3730
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3731
			zap.String("req", req.Request),
3732
			zap.Error(errProxyIsUnhealthy(Params.ProxyCfg.ProxyID)))
3733 3734 3735 3736

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3737
				Reason:    msgProxyIsUnhealthy(Params.ProxyCfg.ProxyID),
3738 3739 3740 3741 3742 3743 3744 3745
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3746
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

D
dragondriver 已提交
3762 3763 3764 3765 3766 3767 3768 3769 3770 3771
	msgID := UniqueID(0)
	msgID, err = node.idAllocator.AllocOne()
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to allocate id",
			zap.Error(err))
	}
	req.Base = &commonpb.MsgBase{
		MsgType:   commonpb.MsgType_SystemInfo,
		MsgID:     msgID,
		Timestamp: 0,
3772
		SourceID:  Params.ProxyCfg.ProxyID,
D
dragondriver 已提交
3773 3774
	}

3775
	if metricType == metricsinfo.SystemInfoMetrics {
3776 3777 3778 3779 3780 3781 3782
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3783
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3784 3785

		log.Debug("Proxy.GetMetrics",
3786
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3787 3788 3789 3790 3791
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3792 3793
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3794
		return metrics, nil
3795 3796 3797
	}

	log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
3798
		zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

B
bigsheeper 已提交
3811 3812 3813
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("Proxy.LoadBalance",
3814
		zap.Int64("proxy_id", Params.ProxyCfg.ProxyID),
B
bigsheeper 已提交
3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_LoadBalanceSegments,
			MsgID:     0,
			Timestamp: 0,
3829
			SourceID:  Params.ProxyCfg.ProxyID,
B
bigsheeper 已提交
3830 3831 3832
		},
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3833
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851
		SealedSegmentIDs: req.SealedSegmentIDs,
	})
	if err != nil {
		log.Error("Failed to LoadBalance from Query Coordinator",
			zap.Any("req", req), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
		log.Error("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason))
		status.Reason = infoResp.Reason
		return status, nil
	}
	log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp))
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

J
jingkl 已提交
3852
//GetCompactionState gets the compaction state of multiple segments
3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
	log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
	log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3866
// ManualCompaction invokes compaction on specified collection
3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
	log.Info("received ManualCompaction request", zap.Int64("collectionID", req.GetCollectionID()))
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
	log.Info("received ManualCompaction response", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
	log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

B
Bingyi Sun 已提交
3893 3894 3895
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	log.Info("received get flush state request", zap.Any("request", req))
3896
	var err error
B
Bingyi Sun 已提交
3897 3898 3899 3900 3901 3902 3903
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		log.Info("unable to get flush state because of closed server")
		return resp, nil
	}

3904
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3905 3906 3907 3908
	if err != nil {
		log.Info("failed to get flush state response", zap.Error(err))
		return nil, err
	}
B
Bingyi Sun 已提交
3909 3910 3911 3912
	log.Info("received get flush state response", zap.Any("response", resp))
	return resp, err
}

C
Cai Yudong 已提交
3913 3914
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3915 3916 3917 3918
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

J
jingkl 已提交
3919
//unhealthyStatus returns the proxy not healthy status
3920 3921 3922
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3923
		Reason:    "proxy not healthy",
3924 3925
	}
}
G
groot 已提交
3926 3927 3928

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
G
groot 已提交
3929
	log.Info("received import request")
G
groot 已提交
3930 3931 3932 3933 3934 3935
	resp := &milvuspb.ImportResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

G
groot 已提交
3936 3937 3938
	resp, err := node.rootCoord.Import(ctx, req)
	log.Info("received import response", zap.String("collectionName", req.GetCollectionName()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
G
groot 已提交
3939 3940 3941 3942
}

// Check import task state from datanode
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
G
groot 已提交
3943
	log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3944 3945 3946 3947 3948 3949
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

G
groot 已提交
3950 3951 3952
	resp, err := node.rootCoord.GetImportState(ctx, req)
	log.Info("received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
G
groot 已提交
3953
}