impl.go 134.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24 25
	"strconv"

26 27
	"github.com/milvus-io/milvus/internal/util"

28
	"go.uber.org/zap"
G
groot 已提交
29
	"go.uber.org/zap/zapcore"
S
sunby 已提交
30

31
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/log"
33
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
34
	"github.com/milvus-io/milvus/internal/mq/msgstream"
35

X
Xiangyu Wang 已提交
36 37 38 39 40 41
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43 44
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
45
	"github.com/milvus-io/milvus/internal/util/timerecord"
46
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
47
	"github.com/milvus-io/milvus/internal/util/typeutil"
48 49
)

50 51
const moduleName = "Proxy"

52
// UpdateStateCode updates the state code of Proxy.
C
Cai Yudong 已提交
53
func (node *Proxy) UpdateStateCode(code internalpb.StateCode) {
54
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
55 56
}

57
// GetComponentStates get state of Proxy.
C
Cai Yudong 已提交
58
func (node *Proxy) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
59 60 61 62 63 64 65 66 67 68 69 70
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
71
		return stats, nil
G
godchen 已提交
72
	}
73 74 75 76
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
G
godchen 已提交
77
	info := &internalpb.ComponentInfo{
78 79
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
80
		Role:      typeutil.ProxyRole,
G
godchen 已提交
81 82 83 84 85 86
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
87
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
88
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
89 90 91 92 93 94 95 96 97
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

98
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
99
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
100
	ctx = logutil.WithModule(ctx, moduleName)
101
	logutil.Logger(ctx).Info("received request to invalidate collection meta cache",
102
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
103
		zap.String("db", request.DbName),
104 105
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
106

107
	collectionName := request.CollectionName
108
	collectionID := request.CollectionID
N
neza2017 已提交
109
	if globalMetaCache != nil {
110 111 112 113 114 115
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
			globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
		}
N
neza2017 已提交
116
	}
117
	logutil.Logger(ctx).Info("complete to invalidate collection meta cache",
118
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
119
		zap.String("db", request.DbName),
120 121
		zap.String("collection", collectionName),
		zap.Int64("collectionID", collectionID))
D
dragondriver 已提交
122

123
	return &commonpb.Status{
124
		ErrorCode: commonpb.ErrorCode_Success,
125 126
		Reason:    "",
	}, nil
127 128
}

129
// ReleaseDQLMessageStream release the query message stream of specific collection.
C
Cai Yudong 已提交
130
func (node *Proxy) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
131 132
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to release DQL message strem",
133
		zap.Any("role", typeutil.ProxyRole),
134 135 136
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

137 138 139 140
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

141 142
	_ = node.chMgr.removeDQLStream(request.CollectionID)

143
	logutil.Logger(ctx).Debug("complete to release DQL message stream",
144
		zap.Any("role", typeutil.ProxyRole),
145 146 147 148 149 150 151 152 153
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

154
// CreateCollection create a collection by the schema.
155
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
156
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
157 158 159
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
160 161 162 163

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
164 165 166
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

X
Xiaofan 已提交
167
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
168

169
	cct := &createCollectionTask{
S
sunby 已提交
170
		ctx:                     ctx,
171 172
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
173
		rootCoord:               node.rootCoord,
174 175
	}

176 177 178
	// avoid data race
	lenOfSchema := len(request.Schema)

179 180
	log.Debug(
		rpcReceived(method),
181
		zap.String("traceID", traceID),
182
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
183 184
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
185
		zap.Int("len(schema)", lenOfSchema),
186 187
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
188

189 190 191
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
192 193
			zap.Error(err),
			zap.String("traceID", traceID),
194
			zap.String("role", typeutil.ProxyRole),
195 196 197
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Int("len(schema)", lenOfSchema),
198 199
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
200

X
Xiaofan 已提交
201
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
202
		return &commonpb.Status{
203
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
204 205 206 207
			Reason:    err.Error(),
		}, nil
	}

208 209
	log.Debug(
		rpcEnqueued(method),
210
		zap.String("traceID", traceID),
211
		zap.String("role", typeutil.ProxyRole),
212 213 214
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
215 216
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
217 218
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
219 220
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
221

222 223 224
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
225
			zap.Error(err),
226
			zap.String("traceID", traceID),
227
			zap.String("role", typeutil.ProxyRole),
228 229 230
			zap.Int64("MsgID", cct.ID()),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
231 232
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
233
			zap.Int("len(schema)", lenOfSchema),
234 235
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
D
dragondriver 已提交
236

X
Xiaofan 已提交
237
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
238
		return &commonpb.Status{
239
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
240 241 242 243
			Reason:    err.Error(),
		}, nil
	}

244 245
	log.Debug(
		rpcDone(method),
246
		zap.String("traceID", traceID),
247
		zap.String("role", typeutil.ProxyRole),
248 249 250 251 252 253
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
254 255
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
256

X
Xiaofan 已提交
257 258
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
259 260 261
	return cct.result, nil
}

262
// DropCollection drop a collection.
C
Cai Yudong 已提交
263
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
264 265 266
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
267 268 269 270

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
271 272
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
273
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
274

275
	dct := &dropCollectionTask{
S
sunby 已提交
276
		ctx:                   ctx,
277 278
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
279
		rootCoord:             node.rootCoord,
280
		chMgr:                 node.chMgr,
S
sunby 已提交
281
		chTicker:              node.chTicker,
282 283
	}

284 285
	log.Debug("DropCollection received",
		zap.String("traceID", traceID),
286
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
287 288
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
289 290 291 292 293

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
294
			zap.String("role", typeutil.ProxyRole),
295 296 297
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
298
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
299
		return &commonpb.Status{
300
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
301 302 303 304
			Reason:    err.Error(),
		}, nil
	}

305 306
	log.Debug("DropCollection enqueued",
		zap.String("traceID", traceID),
307
		zap.String("role", typeutil.ProxyRole),
308 309 310
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
311 312
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
313 314 315

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
316
			zap.Error(err),
317
			zap.String("traceID", traceID),
318
			zap.String("role", typeutil.ProxyRole),
319 320 321
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTs", dct.BeginTs()),
			zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
322 323 324
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
325
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
326
		return &commonpb.Status{
327
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
328 329 330 331
			Reason:    err.Error(),
		}, nil
	}

332 333
	log.Debug("DropCollection done",
		zap.String("traceID", traceID),
334
		zap.String("role", typeutil.ProxyRole),
335 336 337 338 339 340
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
341 342
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
343 344 345
	return dct.result, nil
}

346
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
347
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
348 349 350 351 352
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
353 354 355 356

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
357 358
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
359
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
360
		metrics.TotalLabel).Inc()
361 362 363

	log.Debug("HasCollection received",
		zap.String("traceID", traceID),
364
		zap.String("role", typeutil.ProxyRole),
365 366 367
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

368
	hct := &hasCollectionTask{
S
sunby 已提交
369
		ctx:                  ctx,
370 371
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
372
		rootCoord:            node.rootCoord,
373 374
	}

375 376 377 378
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
379
			zap.String("role", typeutil.ProxyRole),
380 381 382
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
383
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
384
			metrics.AbandonLabel).Inc()
385 386
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
387
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
388 389 390 391 392
				Reason:    err.Error(),
			},
		}, nil
	}

393 394
	log.Debug("HasCollection enqueued",
		zap.String("traceID", traceID),
395
		zap.String("role", typeutil.ProxyRole),
396 397 398
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
399 400
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
401 402 403

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
404
			zap.Error(err),
405
			zap.String("traceID", traceID),
406
			zap.String("role", typeutil.ProxyRole),
407 408 409
			zap.Int64("MsgID", hct.ID()),
			zap.Uint64("BeginTS", hct.BeginTs()),
			zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
410 411 412
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
413
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
414
			metrics.FailLabel).Inc()
415 416
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
417
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
418 419 420 421 422
				Reason:    err.Error(),
			},
		}, nil
	}

423 424
	log.Debug("HasCollection done",
		zap.String("traceID", traceID),
425
		zap.String("role", typeutil.ProxyRole),
426 427 428 429 430 431
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
432
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
433
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
434
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
435 436 437
	return hct.result, nil
}

438
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
439
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
440 441 442
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
443 444 445 446

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
447 448
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
449

450
	lct := &loadCollectionTask{
S
sunby 已提交
451
		ctx:                   ctx,
452 453
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
454
		queryCoord:            node.queryCoord,
455 456
	}

457 458
	log.Debug("LoadCollection received",
		zap.String("traceID", traceID),
459
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
460 461
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
462 463 464 465 466

	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
467
			zap.String("role", typeutil.ProxyRole),
468 469 470
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
471
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
472
			metrics.AbandonLabel).Inc()
473
		return &commonpb.Status{
474
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
475 476 477
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
478

479 480
	log.Debug("LoadCollection enqueued",
		zap.String("traceID", traceID),
481
		zap.String("role", typeutil.ProxyRole),
482 483 484
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
485 486
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
487 488 489

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
490
			zap.Error(err),
491
			zap.String("traceID", traceID),
492
			zap.String("role", typeutil.ProxyRole),
493 494 495
			zap.Int64("MsgID", lct.ID()),
			zap.Uint64("BeginTS", lct.BeginTs()),
			zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
496 497 498
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
499
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
500
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
501
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
502
			metrics.FailLabel).Inc()
503
		return &commonpb.Status{
504
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
505 506 507 508
			Reason:    err.Error(),
		}, nil
	}

509 510
	log.Debug("LoadCollection done",
		zap.String("traceID", traceID),
511
		zap.String("role", typeutil.ProxyRole),
512 513 514 515 516 517
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
518
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
519
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
520
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
521
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
522
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
523
	return lct.result, nil
524 525
}

526
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
527
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
528 529 530
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
531

532
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
533 534
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
535 536
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
537

538
	rct := &releaseCollectionTask{
S
sunby 已提交
539
		ctx:                      ctx,
540 541
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
542
		queryCoord:               node.queryCoord,
543
		chMgr:                    node.chMgr,
544 545
	}

546 547
	log.Debug(
		rpcReceived(method),
548
		zap.String("traceID", traceID),
549
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
550 551
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
552 553

	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
554 555
		log.Warn(
			rpcFailedToEnqueue(method),
556 557
			zap.Error(err),
			zap.String("traceID", traceID),
558
			zap.String("role", typeutil.ProxyRole),
559 560 561
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
562
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
563
			metrics.AbandonLabel).Inc()
564
		return &commonpb.Status{
565
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
566 567 568 569
			Reason:    err.Error(),
		}, nil
	}

570 571
	log.Debug(
		rpcEnqueued(method),
572
		zap.String("traceID", traceID),
573
		zap.String("role", typeutil.ProxyRole),
574 575 576
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
577 578
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
579 580

	if err := rct.WaitToFinish(); err != nil {
581 582
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
583
			zap.Error(err),
584
			zap.String("traceID", traceID),
585
			zap.String("role", typeutil.ProxyRole),
586 587 588
			zap.Int64("MsgID", rct.ID()),
			zap.Uint64("BeginTS", rct.BeginTs()),
			zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
589 590 591
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
592
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
593
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
594
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
595
			metrics.FailLabel).Inc()
596
		return &commonpb.Status{
597
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
598 599 600 601
			Reason:    err.Error(),
		}, nil
	}

602 603
	log.Debug(
		rpcDone(method),
604
		zap.String("traceID", traceID),
605
		zap.String("role", typeutil.ProxyRole),
606 607 608 609 610 611
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
612
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
613
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
614
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
615
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
616
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
617
	return rct.result, nil
618 619
}

620
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
621
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
622 623 624 625 626
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
627

628
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
629 630
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
631 632
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
633

634
	dct := &describeCollectionTask{
S
sunby 已提交
635
		ctx:                       ctx,
636 637
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
638
		rootCoord:                 node.rootCoord,
639 640
	}

641 642
	log.Debug("DescribeCollection received",
		zap.String("traceID", traceID),
643
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
644 645
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
646 647 648 649 650

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
651
			zap.String("role", typeutil.ProxyRole),
652 653 654
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
655
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
656
			metrics.AbandonLabel).Inc()
657 658
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
659
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
660 661 662 663 664
				Reason:    err.Error(),
			},
		}, nil
	}

665 666
	log.Debug("DescribeCollection enqueued",
		zap.String("traceID", traceID),
667
		zap.String("role", typeutil.ProxyRole),
668 669 670
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
671 672
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
673 674 675

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
676
			zap.Error(err),
677
			zap.String("traceID", traceID),
678
			zap.String("role", typeutil.ProxyRole),
679 680 681
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTS", dct.BeginTs()),
			zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
682 683 684
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
685
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
686
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
687
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
688
			metrics.FailLabel).Inc()
689

690 691
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
692
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
693 694 695 696 697
				Reason:    err.Error(),
			},
		}, nil
	}

698 699
	log.Debug("DescribeCollection done",
		zap.String("traceID", traceID),
700
		zap.String("role", typeutil.ProxyRole),
701 702 703 704 705 706
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
707
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
708
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
709
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
710
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
711
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
712 713 714
	return dct.result, nil
}

715
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
716
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
717 718 719 720 721
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
722 723 724 725

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
726 727
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
728

729
	g := &getCollectionStatisticsTask{
G
godchen 已提交
730 731 732
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
733
		dataCoord:                      node.dataCoord,
734 735
	}

736 737
	log.Debug("GetCollectionStatistics received",
		zap.String("traceID", traceID),
738
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
739 740
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
741 742 743 744 745

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn("GetCollectionStatistics failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
746
			zap.String("role", typeutil.ProxyRole),
747 748 749
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
750
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
751
			metrics.AbandonLabel).Inc()
752

G
godchen 已提交
753
		return &milvuspb.GetCollectionStatisticsResponse{
754
			Status: &commonpb.Status{
755
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
756 757 758 759 760
				Reason:    err.Error(),
			},
		}, nil
	}

761 762
	log.Debug("GetCollectionStatistics enqueued",
		zap.String("traceID", traceID),
763
		zap.String("role", typeutil.ProxyRole),
764 765 766
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
767 768
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
769 770 771

	if err := g.WaitToFinish(); err != nil {
		log.Warn("GetCollectionStatistics failed to WaitToFinish",
D
dragondriver 已提交
772
			zap.Error(err),
773
			zap.String("traceID", traceID),
774
			zap.String("role", typeutil.ProxyRole),
775 776 777
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
778 779 780
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
781
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
782
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
783
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
784
			metrics.FailLabel).Inc()
785

G
godchen 已提交
786
		return &milvuspb.GetCollectionStatisticsResponse{
787
			Status: &commonpb.Status{
788
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
789 790 791 792 793
				Reason:    err.Error(),
			},
		}, nil
	}

794 795
	log.Debug("GetCollectionStatistics done",
		zap.String("traceID", traceID),
796
		zap.String("role", typeutil.ProxyRole),
797 798 799 800 801 802
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
803
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
804
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
805
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
806
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
807
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
808
	return g.result, nil
809 810
}

811
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
812
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
813 814 815 816 817
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
818 819
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
820
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
821

822
	sct := &showCollectionsTask{
G
godchen 已提交
823 824 825
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
826
		queryCoord:             node.queryCoord,
827
		rootCoord:              node.rootCoord,
828 829
	}

830
	log.Debug("ShowCollections received",
831
		zap.String("role", typeutil.ProxyRole),
832 833 834 835 836 837
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
		zap.Any("CollectionNames", request.CollectionNames),
	)

838
	err := node.sched.ddQueue.Enqueue(sct)
839
	if err != nil {
840 841
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
842
			zap.String("role", typeutil.ProxyRole),
843 844 845 846 847 848
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

X
Xiaofan 已提交
849
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
850
		return &milvuspb.ShowCollectionsResponse{
851
			Status: &commonpb.Status{
852
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
853 854 855 856 857
				Reason:    err.Error(),
			},
		}, nil
	}

858
	log.Debug("ShowCollections enqueued",
859
		zap.String("role", typeutil.ProxyRole),
860
		zap.Int64("MsgID", sct.ID()),
861
		zap.String("DbName", sct.ShowCollectionsRequest.DbName),
862
		zap.Uint64("TimeStamp", request.TimeStamp),
863 864 865
		zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
		zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
	)
D
dragondriver 已提交
866

867 868
	err = sct.WaitToFinish()
	if err != nil {
869 870
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
871
			zap.String("role", typeutil.ProxyRole),
872 873 874 875 876 877 878
			zap.Int64("MsgID", sct.ID()),
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

X
Xiaofan 已提交
879
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
880

G
godchen 已提交
881
		return &milvuspb.ShowCollectionsResponse{
882
			Status: &commonpb.Status{
883
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
884 885 886 887 888
				Reason:    err.Error(),
			},
		}, nil
	}

889
	log.Debug("ShowCollections Done",
890
		zap.String("role", typeutil.ProxyRole),
891 892 893 894
		zap.Int64("MsgID", sct.ID()),
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
895 896
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
897

X
Xiaofan 已提交
898 899
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
900 901 902
	return sct.result, nil
}

903
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
904
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
905 906 907
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
908

909
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
910 911
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
912 913
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
914
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
915

916
	cpt := &createPartitionTask{
S
sunby 已提交
917
		ctx:                    ctx,
918 919
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
920
		rootCoord:              node.rootCoord,
921 922 923
		result:                 nil,
	}

924 925 926
	log.Debug(
		rpcReceived("CreatePartition"),
		zap.String("traceID", traceID),
927
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
928 929 930
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
931 932 933 934 935 936

	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
			zap.Error(err),
			zap.String("traceID", traceID),
937
			zap.String("role", typeutil.ProxyRole),
938 939 940 941
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
942
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
943

944
		return &commonpb.Status{
945
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
946 947 948
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
949

950 951 952
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.String("traceID", traceID),
953
		zap.String("role", typeutil.ProxyRole),
954 955 956
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
957 958 959
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
960 961 962 963

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
964
			zap.Error(err),
965
			zap.String("traceID", traceID),
966
			zap.String("role", typeutil.ProxyRole),
967 968 969
			zap.Int64("MsgID", cpt.ID()),
			zap.Uint64("BeginTS", cpt.BeginTs()),
			zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
970 971 972 973
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
974
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
975

976
		return &commonpb.Status{
977
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
978 979 980
			Reason:    err.Error(),
		}, nil
	}
981 982 983 984

	log.Debug(
		rpcDone("CreatePartition"),
		zap.String("traceID", traceID),
985
		zap.String("role", typeutil.ProxyRole),
986 987 988 989 990 991 992
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
993 994
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
995 996 997
	return cpt.result, nil
}

998
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
999
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
1000 1001 1002
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1003

1004
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
1005 1006
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1007 1008
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
1009
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1010

1011
	dpt := &dropPartitionTask{
S
sunby 已提交
1012
		ctx:                  ctx,
1013 1014
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
1015
		rootCoord:            node.rootCoord,
1016 1017 1018
		result:               nil,
	}

1019 1020 1021
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1022
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1023 1024 1025
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1026 1027 1028 1029 1030 1031

	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1032
			zap.String("role", typeutil.ProxyRole),
1033 1034 1035 1036
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1037
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
1038

1039
		return &commonpb.Status{
1040
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1041 1042 1043
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1044

1045 1046 1047
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1048
		zap.String("role", typeutil.ProxyRole),
1049 1050 1051
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1052 1053 1054
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1055 1056 1057 1058

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1059
			zap.Error(err),
1060
			zap.String("traceID", traceID),
1061
			zap.String("role", typeutil.ProxyRole),
1062 1063 1064
			zap.Int64("MsgID", dpt.ID()),
			zap.Uint64("BeginTS", dpt.BeginTs()),
			zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1065 1066 1067 1068
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1069
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1070

1071
		return &commonpb.Status{
1072
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1073 1074 1075
			Reason:    err.Error(),
		}, nil
	}
1076 1077 1078 1079

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1080
		zap.String("role", typeutil.ProxyRole),
1081 1082 1083 1084 1085 1086 1087
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1088 1089
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1090 1091 1092
	return dpt.result, nil
}

1093
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1094
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1095 1096 1097 1098 1099
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1100

D
dragondriver 已提交
1101
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1102 1103
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1104 1105 1106
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
X
Xiaofan 已提交
1107
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1108
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1109

1110
	hpt := &hasPartitionTask{
S
sunby 已提交
1111
		ctx:                 ctx,
1112 1113
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1114
		rootCoord:           node.rootCoord,
1115 1116 1117
		result:              nil,
	}

D
dragondriver 已提交
1118 1119 1120
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1121
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1122 1123 1124
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1125 1126 1127 1128 1129 1130

	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1131
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1132 1133 1134 1135
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1136
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1137
			metrics.AbandonLabel).Inc()
1138

1139 1140
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1141
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1142 1143 1144 1145 1146
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1147

D
dragondriver 已提交
1148 1149 1150
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1151
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1152 1153 1154
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1155 1156 1157
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1158 1159 1160 1161

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1162
			zap.Error(err),
D
dragondriver 已提交
1163
			zap.String("traceID", traceID),
1164
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1165 1166 1167
			zap.Int64("MsgID", hpt.ID()),
			zap.Uint64("BeginTS", hpt.BeginTs()),
			zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1168 1169 1170 1171
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1172
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1173
			metrics.FailLabel).Inc()
1174

1175 1176
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1177
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1178 1179 1180 1181 1182
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1183 1184 1185 1186

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1187
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1188 1189 1190 1191 1192 1193 1194
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1195
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1196
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1197
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1198 1199 1200
	return hpt.result, nil
}

1201
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1202
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1203 1204 1205
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1206

D
dragondriver 已提交
1207
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1208 1209
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1210 1211
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
1212

1213
	lpt := &loadPartitionsTask{
G
godchen 已提交
1214 1215 1216
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1217
		queryCoord:            node.queryCoord,
1218 1219
	}

1220 1221 1222
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1223
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1224 1225 1226
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1227 1228 1229 1230 1231 1232

	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1233
			zap.String("role", typeutil.ProxyRole),
1234 1235 1236 1237
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1238
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1239
			metrics.AbandonLabel).Inc()
1240

1241
		return &commonpb.Status{
1242
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1243 1244 1245 1246
			Reason:    err.Error(),
		}, nil
	}

1247 1248 1249
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1250
		zap.String("role", typeutil.ProxyRole),
1251 1252 1253
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1254 1255 1256
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1257 1258 1259 1260

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1261
			zap.Error(err),
1262
			zap.String("traceID", traceID),
1263
			zap.String("role", typeutil.ProxyRole),
1264 1265 1266
			zap.Int64("MsgID", lpt.ID()),
			zap.Uint64("BeginTS", lpt.BeginTs()),
			zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1267 1268 1269 1270
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1271
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1272
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1273
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1274
			metrics.FailLabel).Inc()
1275

1276
		return &commonpb.Status{
1277
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1278 1279 1280 1281
			Reason:    err.Error(),
		}, nil
	}

1282 1283 1284
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1285
		zap.String("role", typeutil.ProxyRole),
1286 1287 1288 1289 1290 1291 1292
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1293
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1294
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1295
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1296
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1297
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1298
	return lpt.result, nil
1299 1300
}

1301
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1302
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1303 1304 1305
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1306 1307 1308 1309 1310

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1311
	rpt := &releasePartitionsTask{
G
godchen 已提交
1312 1313 1314
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1315
		queryCoord:               node.queryCoord,
1316 1317
	}

1318
	method := "ReleasePartitions"
1319
	tr := timerecord.NewTimeRecorder(method)
1320 1321 1322 1323

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1324
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1325 1326 1327
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1328 1329 1330 1331 1332 1333

	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1334
			zap.String("role", typeutil.ProxyRole),
1335 1336 1337 1338
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1339
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1340
			metrics.AbandonLabel).Inc()
1341

1342
		return &commonpb.Status{
1343
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1344 1345 1346 1347
			Reason:    err.Error(),
		}, nil
	}

1348 1349 1350
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1351
		zap.String("role", typeutil.ProxyRole),
1352 1353 1354
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1355 1356 1357
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1358 1359 1360 1361

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1362
			zap.Error(err),
1363
			zap.String("traceID", traceID),
1364
			zap.String("role", typeutil.ProxyRole),
1365 1366 1367
			zap.Int64("msgID", rpt.Base.MsgID),
			zap.Uint64("BeginTS", rpt.BeginTs()),
			zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1368 1369 1370 1371
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1372
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1373
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1374
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1375
			metrics.FailLabel).Inc()
1376

1377
		return &commonpb.Status{
1378
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1379 1380 1381 1382
			Reason:    err.Error(),
		}, nil
	}

1383 1384 1385
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1386
		zap.String("role", typeutil.ProxyRole),
1387 1388 1389 1390 1391 1392 1393
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1394
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1395
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1396
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1397
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1398
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1399
	return rpt.result, nil
1400 1401
}

1402
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1403
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1404 1405 1406 1407 1408
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1409 1410 1411 1412 1413

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1414
	g := &getPartitionStatisticsTask{
1415 1416 1417
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1418
		dataCoord:                     node.dataCoord,
1419 1420
	}

1421
	method := "GetPartitionStatistics"
1422
	tr := timerecord.NewTimeRecorder(method)
1423 1424 1425 1426

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1427
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1428 1429 1430
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1431 1432 1433 1434 1435 1436

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1437
			zap.String("role", typeutil.ProxyRole),
1438 1439 1440 1441
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1442
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1443
			metrics.AbandonLabel).Inc()
1444

1445 1446 1447 1448 1449 1450 1451 1452
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1453 1454 1455
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1456
		zap.String("role", typeutil.ProxyRole),
1457 1458 1459
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
1460 1461 1462
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1463 1464 1465 1466

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1467
			zap.Error(err),
1468
			zap.String("traceID", traceID),
1469
			zap.String("role", typeutil.ProxyRole),
1470 1471 1472
			zap.Int64("msgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
1473 1474 1475 1476
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1477
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1478
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1479
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1480
			metrics.FailLabel).Inc()
1481

1482 1483 1484 1485 1486 1487 1488 1489
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1490 1491 1492
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1493
		zap.String("role", typeutil.ProxyRole),
1494 1495 1496 1497 1498 1499 1500
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1501
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1502
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1503
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1504
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1505
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1506
	return g.result, nil
1507 1508
}

1509
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1510
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1511 1512 1513 1514 1515
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1516 1517 1518 1519 1520

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1521
	spt := &showPartitionsTask{
G
godchen 已提交
1522 1523 1524
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1525
		rootCoord:             node.rootCoord,
1526
		queryCoord:            node.queryCoord,
G
godchen 已提交
1527
		result:                nil,
1528 1529
	}

1530
	method := "ShowPartitions"
1531 1532
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
X
Xiaofan 已提交
1533
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1534
		metrics.TotalLabel).Inc()
1535 1536 1537 1538

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1539
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1540
		zap.Any("request", request))
1541 1542 1543 1544 1545 1546

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1547
			zap.String("role", typeutil.ProxyRole),
1548 1549
			zap.Any("request", request))

X
Xiaofan 已提交
1550
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1551
			metrics.AbandonLabel).Inc()
1552

G
godchen 已提交
1553
		return &milvuspb.ShowPartitionsResponse{
1554
			Status: &commonpb.Status{
1555
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1556 1557 1558 1559 1560
				Reason:    err.Error(),
			},
		}, nil
	}

1561 1562 1563
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1564
		zap.String("role", typeutil.ProxyRole),
1565 1566 1567
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1568 1569
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1570 1571 1572 1573 1574
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1575
			zap.Error(err),
1576
			zap.String("traceID", traceID),
1577
			zap.String("role", typeutil.ProxyRole),
1578 1579 1580 1581 1582 1583
			zap.Int64("msgID", spt.ID()),
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1584

X
Xiaofan 已提交
1585
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1586
			metrics.FailLabel).Inc()
1587

G
godchen 已提交
1588
		return &milvuspb.ShowPartitionsResponse{
1589
			Status: &commonpb.Status{
1590
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1591 1592 1593 1594
				Reason:    err.Error(),
			},
		}, nil
	}
1595 1596 1597 1598

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1599
		zap.String("role", typeutil.ProxyRole),
1600 1601 1602 1603 1604 1605 1606
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

X
Xiaofan 已提交
1607
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1608
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1609
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1610 1611 1612
	return spt.result, nil
}

1613
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1614
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1615 1616 1617
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1618 1619 1620 1621 1622

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1623
	cit := &createIndexTask{
S
sunby 已提交
1624
		ctx:                ctx,
1625 1626
		Condition:          NewTaskCondition(ctx),
		CreateIndexRequest: request,
1627
		rootCoord:          node.rootCoord,
1628 1629
	}

D
dragondriver 已提交
1630
	method := "CreateIndex"
1631
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1632 1633 1634 1635

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1636
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1637 1638 1639 1640
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1641 1642 1643 1644 1645 1646

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1647
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1648 1649 1650 1651 1652
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1653
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1654
			metrics.AbandonLabel).Inc()
1655

1656
		return &commonpb.Status{
1657
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1658 1659 1660 1661
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1662 1663 1664
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1665
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1666 1667 1668
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1669 1670 1671 1672
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1673 1674 1675 1676

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1677
			zap.Error(err),
D
dragondriver 已提交
1678
			zap.String("traceID", traceID),
1679
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1680 1681 1682
			zap.Int64("MsgID", cit.ID()),
			zap.Uint64("BeginTs", cit.BeginTs()),
			zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1683 1684 1685 1686 1687
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1688
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1689
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1690
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1691
			metrics.FailLabel).Inc()
1692

1693
		return &commonpb.Status{
1694
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1695 1696 1697 1698
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1699 1700 1701
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1702
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1703 1704 1705 1706 1707 1708 1709 1710
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1711
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1712
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1713
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1714
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1715
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1716 1717 1718
	return cit.result, nil
}

1719
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1720
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1721 1722 1723 1724 1725
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1726 1727 1728 1729 1730

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1731
	dit := &describeIndexTask{
S
sunby 已提交
1732
		ctx:                  ctx,
1733 1734
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1735
		rootCoord:            node.rootCoord,
1736 1737
	}

1738 1739 1740
	method := "DescribeIndex"
	// avoid data race
	indexName := request.IndexName
1741
	tr := timerecord.NewTimeRecorder(method)
1742 1743 1744 1745

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1746
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1747 1748 1749
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1750 1751 1752 1753 1754 1755 1756
		zap.String("index name", indexName))

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1757
			zap.String("role", typeutil.ProxyRole),
1758 1759 1760 1761 1762
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", indexName))

X
Xiaofan 已提交
1763
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1764
			metrics.AbandonLabel).Inc()
1765

1766 1767
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1768
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1769 1770 1771 1772 1773
				Reason:    err.Error(),
			},
		}, nil
	}

1774 1775 1776
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1777
		zap.String("role", typeutil.ProxyRole),
1778 1779 1780
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1781 1782 1783
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1784 1785 1786 1787 1788
		zap.String("index name", indexName))

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1789
			zap.Error(err),
1790
			zap.String("traceID", traceID),
1791
			zap.String("role", typeutil.ProxyRole),
1792 1793 1794
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1795 1796 1797
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
1798
			zap.String("index name", indexName))
D
dragondriver 已提交
1799

Z
zhenshan.cao 已提交
1800 1801 1802 1803
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
X
Xiaofan 已提交
1804
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1805
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1806
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1807
			metrics.FailLabel).Inc()
1808

1809 1810
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1811
				ErrorCode: errCode,
1812 1813 1814 1815 1816
				Reason:    err.Error(),
			},
		}, nil
	}

1817 1818 1819
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1820
		zap.String("role", typeutil.ProxyRole),
1821 1822 1823 1824 1825 1826 1827 1828
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", indexName))

X
Xiaofan 已提交
1829
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1830
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1831
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1832
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1833
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1834 1835 1836
	return dit.result, nil
}

1837
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1838
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1839 1840 1841
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1842 1843 1844 1845 1846

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1847
	dit := &dropIndexTask{
S
sunby 已提交
1848
		ctx:              ctx,
B
BossZou 已提交
1849 1850
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1851
		rootCoord:        node.rootCoord,
B
BossZou 已提交
1852
	}
G
godchen 已提交
1853

D
dragondriver 已提交
1854
	method := "DropIndex"
1855
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1856 1857 1858 1859

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1860
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1861 1862 1863 1864 1865
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

D
dragondriver 已提交
1866 1867 1868 1869 1870
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1871
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1872 1873 1874 1875
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
1876
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1877
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1878

B
BossZou 已提交
1879
		return &commonpb.Status{
1880
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1881 1882 1883
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1884

D
dragondriver 已提交
1885 1886 1887
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1888
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1889 1890 1891
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1892 1893 1894 1895
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
D
dragondriver 已提交
1896 1897 1898 1899

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1900
			zap.Error(err),
D
dragondriver 已提交
1901
			zap.String("traceID", traceID),
1902
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1903 1904 1905
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1906 1907 1908 1909 1910
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
1911
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1912
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1913
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1914
			metrics.FailLabel).Inc()
1915

B
BossZou 已提交
1916
		return &commonpb.Status{
1917
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1918 1919 1920
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1921 1922 1923 1924

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1925
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1926 1927 1928 1929 1930 1931 1932 1933
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

X
Xiaofan 已提交
1934
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1935
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1936
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1937
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1938
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1939 1940 1941
	return dit.result, nil
}

1942 1943
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
C
Cai Yudong 已提交
1944
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1945 1946 1947 1948 1949
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1950 1951 1952 1953 1954

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1955
	gibpt := &getIndexBuildProgressTask{
1956 1957 1958
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1959 1960
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1961
		dataCoord:                    node.dataCoord,
1962 1963
	}

1964
	method := "GetIndexBuildProgress"
1965
	tr := timerecord.NewTimeRecorder(method)
1966 1967 1968 1969

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1970
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1971 1972 1973 1974
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1975 1976 1977 1978 1979 1980

	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1981
			zap.String("role", typeutil.ProxyRole),
1982 1983 1984 1985
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
1986
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1987
			metrics.AbandonLabel).Inc()
1988

1989 1990 1991 1992 1993 1994 1995 1996
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1997 1998 1999
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2000
		zap.String("role", typeutil.ProxyRole),
2001 2002 2003
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
2004 2005 2006 2007
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2008 2009 2010 2011

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
2012
			zap.Error(err),
2013
			zap.String("traceID", traceID),
2014
			zap.String("role", typeutil.ProxyRole),
2015 2016 2017
			zap.Int64("MsgID", gibpt.ID()),
			zap.Uint64("BeginTs", gibpt.BeginTs()),
			zap.Uint64("EndTs", gibpt.EndTs()),
2018 2019 2020 2021
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
2022
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2023
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2024
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2025
			metrics.FailLabel).Inc()
2026 2027 2028 2029 2030 2031 2032 2033

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2034 2035 2036 2037

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2038
		zap.String("role", typeutil.ProxyRole),
2039 2040 2041 2042 2043 2044 2045 2046
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName),
		zap.Any("result", gibpt.result))
2047

X
Xiaofan 已提交
2048
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2049
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2050
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2051
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2052
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2053
	return gibpt.result, nil
2054 2055
}

2056
// GetIndexState get the build-state of index.
C
Cai Yudong 已提交
2057
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
2058 2059 2060 2061 2062
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2063 2064 2065 2066 2067

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2068
	dipt := &getIndexStateTask{
G
godchen 已提交
2069 2070 2071
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
2072 2073
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
2074 2075
	}

2076
	method := "GetIndexState"
2077
	tr := timerecord.NewTimeRecorder(method)
2078 2079 2080 2081

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2082
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2083 2084 2085 2086
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2087 2088 2089 2090 2091 2092

	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2093
			zap.String("role", typeutil.ProxyRole),
2094 2095 2096 2097 2098
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2099
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2100
			metrics.AbandonLabel).Inc()
2101

G
godchen 已提交
2102
		return &milvuspb.GetIndexStateResponse{
2103
			Status: &commonpb.Status{
2104
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2105 2106 2107 2108 2109
				Reason:    err.Error(),
			},
		}, nil
	}

2110 2111 2112
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2113
		zap.String("role", typeutil.ProxyRole),
2114 2115 2116
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2117 2118 2119 2120
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2121 2122 2123 2124

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2125
			zap.Error(err),
2126
			zap.String("traceID", traceID),
2127
			zap.String("role", typeutil.ProxyRole),
2128 2129 2130
			zap.Int64("MsgID", dipt.ID()),
			zap.Uint64("BeginTs", dipt.BeginTs()),
			zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2131 2132 2133 2134 2135
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2136
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2137
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2138
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2139
			metrics.FailLabel).Inc()
2140

G
godchen 已提交
2141
		return &milvuspb.GetIndexStateResponse{
2142
			Status: &commonpb.Status{
2143
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2144 2145 2146 2147 2148
				Reason:    err.Error(),
			},
		}, nil
	}

2149 2150 2151
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2152
		zap.String("role", typeutil.ProxyRole),
2153 2154 2155 2156 2157 2158 2159 2160
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2161
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2162
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2163
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2164
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2165
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2166 2167 2168
	return dipt.result, nil
}

2169
// Insert insert records into collection.
C
Cai Yudong 已提交
2170
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2171 2172 2173 2174 2175 2176
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))

2177 2178 2179 2180 2181
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2182 2183
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2184

2185 2186 2187 2188 2189
	defer func() {
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.TotalLabel).Inc()
	}()

2190
	it := &insertTask{
2191 2192
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2193
		// req:       request,
2194 2195 2196 2197
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2198
			InsertRequest: internalpb.InsertRequest{
2199
				Base: &commonpb.MsgBase{
X
xige-16 已提交
2200 2201
					MsgType:  commonpb.MsgType_Insert,
					MsgID:    0,
X
Xiaofan 已提交
2202
					SourceID: Params.ProxyCfg.GetNodeID(),
2203 2204 2205
				},
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2206 2207 2208
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2209
				// RowData: transfer column based request to this
2210 2211
			},
		},
2212
		rowIDAllocator: node.idAllocator,
2213
		segIDAssigner:  node.segAssigner,
2214
		chMgr:          node.chMgr,
2215
		chTicker:       node.chTicker,
2216
	}
2217 2218

	if len(it.PartitionName) <= 0 {
2219
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2220 2221
	}

X
Xiangyu Wang 已提交
2222
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2223
		numRows := request.NumRows
2224 2225 2226 2227
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2228

X
Xiangyu Wang 已提交
2229 2230 2231 2232 2233 2234 2235
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2236 2237
	}

X
Xiangyu Wang 已提交
2238
	log.Debug("Enqueue insert request in Proxy",
2239
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2240 2241 2242 2243 2244
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2245 2246
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))
D
dragondriver 已提交
2247

X
Xiangyu Wang 已提交
2248 2249
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Debug("Failed to enqueue insert task: " + err.Error())
2250 2251
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2252
		return constructFailedResponse(err), nil
2253
	}
D
dragondriver 已提交
2254

X
Xiangyu Wang 已提交
2255
	log.Debug("Detail of insert request in Proxy",
2256
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2257
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2258 2259 2260 2261 2262
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
X
Xiangyu Wang 已提交
2263 2264 2265 2266 2267
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))

	if err := it.WaitToFinish(); err != nil {
		log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2268
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2269
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2270 2271 2272 2273 2274
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2275
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2287
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2288

2289
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2290
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2291
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(it.result.InsertCnt))
2292
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2293 2294 2295
	return it.result, nil
}

2296
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2297
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2298 2299 2300
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2301 2302
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
2303

G
groot 已提交
2304 2305 2306 2307 2308 2309
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2310 2311 2312
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

2313 2314
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
2315
	dt := &deleteTask{
X
xige-16 已提交
2316 2317 2318
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
G
godchen 已提交
2319
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2320 2321 2322
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2323 2324 2325 2326 2327
			DeleteRequest: internalpb.DeleteRequest{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_Delete,
					MsgID:   0,
				},
X
xige-16 已提交
2328
				DbName:         request.DbName,
G
godchen 已提交
2329 2330 2331
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2332 2333 2334 2335
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2336 2337
	}

2338
	log.Debug("Enqueue delete request in Proxy",
2339
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2340 2341 2342 2343
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2344 2345 2346 2347

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
		log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
X
Xiaofan 已提交
2348
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2349
			metrics.FailLabel).Inc()
2350

G
groot 已提交
2351 2352 2353 2354 2355 2356 2357 2358
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2359
	log.Debug("Detail of delete request in Proxy",
2360
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2361 2362 2363 2364 2365
		zap.Int64("msgID", dt.Base.MsgID),
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2366 2367
		zap.String("expr", request.Expr),
		zap.String("traceID", traceID))
G
groot 已提交
2368

2369 2370
	if err := dt.WaitToFinish(); err != nil {
		log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
X
Xiaofan 已提交
2371
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2372
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2373
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2374
			metrics.FailLabel).Inc()
G
groot 已提交
2375 2376 2377 2378 2379 2380 2381 2382
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

X
Xiaofan 已提交
2383
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2384
		metrics.SuccessLabel).Inc()
2385
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2386 2387 2388
	return dt.result, nil
}

2389
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2390
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2391 2392 2393 2394 2395
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2396 2397
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
2398 2399
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2400

C
cai.zhang 已提交
2401 2402
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2403 2404
	traceID, _, _ := trace.InfoFromSpan(sp)

2405
	qt := &searchTask{
S
sunby 已提交
2406
		ctx:       ctx,
2407
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2408
		SearchRequest: &internalpb.SearchRequest{
2409
			Base: &commonpb.MsgBase{
2410
				MsgType:  commonpb.MsgType_Search,
X
Xiaofan 已提交
2411
				SourceID: Params.ProxyCfg.GetNodeID(),
2412
			},
2413
			ReqID: Params.ProxyCfg.GetNodeID(),
2414
		},
2415 2416 2417 2418
		request:            request,
		qc:                 node.queryCoord,
		tr:                 timerecord.NewTimeRecorder("search"),
		getQueryNodePolicy: defaultGetQueryNodePolicy,
2419 2420
	}

2421 2422 2423 2424 2425
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

	log.Debug(
		rpcReceived(method),
D
dragondriver 已提交
2426
		zap.String("traceID", traceID),
2427
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2428 2429 2430 2431 2432
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2433 2434 2435 2436
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2437

2438 2439 2440
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
D
dragondriver 已提交
2441 2442
			zap.Error(err),
			zap.String("traceID", traceID),
2443
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2444 2445 2446 2447 2448 2449
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
			zap.Any("OutputFields", request.OutputFields),
2450 2451 2452
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2453

2454 2455
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
2456

2457 2458
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2459
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2460 2461 2462 2463
				Reason:    err.Error(),
			},
		}, nil
	}
2464
	tr.Record("search request enqueue")
2465

2466 2467
	log.Debug(
		rpcEnqueued(method),
D
dragondriver 已提交
2468
		zap.String("traceID", traceID),
2469
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2470
		zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2471 2472 2473 2474 2475
		zap.Uint64("timestamp", qt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
2476
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2477 2478 2479 2480
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2481

2482 2483 2484
	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2485
			zap.Error(err),
D
dragondriver 已提交
2486
			zap.String("traceID", traceID),
2487
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2488
			zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2489 2490 2491 2492
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
2493
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2494 2495 2496 2497
			zap.Any("OutputFields", request.OutputFields),
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
2498

2499 2500
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
2501

2502 2503
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2504
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2505 2506 2507 2508 2509
				Reason:    err.Error(),
			},
		}, nil
	}

2510 2511 2512
	span := tr.Record("wait search result")
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2513 2514
	log.Debug(
		rpcDone(method),
D
dragondriver 已提交
2515
		zap.String("traceID", traceID),
2516
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2517 2518 2519 2520 2521 2522
		zap.Int64("msgID", qt.ID()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2523 2524 2525 2526
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2527

2528 2529 2530
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2531
	searchDur := tr.ElapseSpan().Milliseconds()
X
Xiaofan 已提交
2532
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
2533
		metrics.SearchLabel).Observe(float64(searchDur))
2534 2535 2536
	return qt.result, nil
}

2537
// Flush notify data nodes to persist the data of collection.
2538 2539 2540 2541 2542 2543 2544
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2545
	if !node.checkHealthy() {
2546 2547
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2548
	}
D
dragondriver 已提交
2549 2550 2551 2552 2553

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2554
	ft := &flushTask{
T
ThreadDao 已提交
2555 2556 2557
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2558
		dataCoord:    node.dataCoord,
2559 2560
	}

D
dragondriver 已提交
2561
	method := "Flush"
2562
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2563
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2564 2565 2566 2567

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2568
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2569 2570
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2571 2572 2573 2574 2575 2576

	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2577
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2578 2579 2580
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2581
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2582

2583 2584
		resp.Status.Reason = err.Error()
		return resp, nil
2585 2586
	}

D
dragondriver 已提交
2587 2588 2589
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2590
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2591 2592 2593
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2594 2595
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2596 2597 2598 2599

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2600
			zap.Error(err),
D
dragondriver 已提交
2601
			zap.String("traceID", traceID),
2602
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2603 2604 2605
			zap.Int64("MsgID", ft.ID()),
			zap.Uint64("BeginTs", ft.BeginTs()),
			zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2606 2607 2608
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2609
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2610

D
dragondriver 已提交
2611
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2612 2613
		resp.Status.Reason = err.Error()
		return resp, nil
2614 2615
	}

D
dragondriver 已提交
2616 2617 2618
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2619
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2620 2621 2622 2623 2624 2625
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2626 2627
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2628
	return ft.result, nil
2629 2630
}

2631
// Query get the records by primary keys.
C
Cai Yudong 已提交
2632
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2633 2634 2635 2636 2637
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2638

D
dragondriver 已提交
2639 2640 2641
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2642
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2643

2644
	qt := &queryTask{
2645 2646 2647 2648 2649
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_Retrieve,
X
Xiaofan 已提交
2650
				SourceID: Params.ProxyCfg.GetNodeID(),
2651
			},
2652
			ReqID: Params.ProxyCfg.GetNodeID(),
2653
		},
2654 2655 2656 2657
		request:            request,
		qc:                 node.queryCoord,
		getQueryNodePolicy: defaultGetQueryNodePolicy,
		queryShardPolicy:   roundRobinPolicy,
2658 2659
	}

D
dragondriver 已提交
2660 2661
	method := "Query"

2662 2663 2664
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()

D
dragondriver 已提交
2665 2666 2667
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2668
		zap.String("role", typeutil.ProxyRole),
2669 2670 2671
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
G
godchen 已提交
2672

D
dragondriver 已提交
2673 2674 2675 2676 2677 2678
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
2679 2680 2681
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2682

2683 2684 2685
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()

2686 2687 2688 2689 2690 2691
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2692
	}
2693
	tr.Record("query request enqueue")
2694

D
dragondriver 已提交
2695 2696 2697
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2698
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2699 2700 2701
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2702 2703 2704
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2705 2706 2707 2708 2709 2710

	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2711
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2712 2713 2714
			zap.Int64("MsgID", qt.ID()),
			zap.Uint64("BeginTs", qt.BeginTs()),
			zap.Uint64("EndTs", qt.EndTs()),
2715 2716 2717
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
2718

2719 2720
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
2721

2722 2723 2724 2725 2726 2727 2728
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2729 2730 2731
	span := tr.Record("wait query result")
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
D
dragondriver 已提交
2732 2733 2734 2735 2736 2737 2738
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2739 2740 2741
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2742

2743 2744 2745 2746
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()

	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
2747
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2748 2749 2750 2751 2752
	return &milvuspb.QueryResults{
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
	}, nil
}
2753

2754
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2755 2756 2757 2758
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2759 2760 2761 2762 2763

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2764 2765 2766 2767 2768 2769 2770
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2771
	method := "CreateAlias"
2772
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2773
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
2793
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2794

Y
Yusup 已提交
2795 2796 2797 2798 2799 2800
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2801 2802 2803
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2804
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2805 2806 2807 2808
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2809 2810
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2811 2812 2813 2814

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2815
			zap.Error(err),
D
dragondriver 已提交
2816
			zap.String("traceID", traceID),
2817
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2818 2819 2820 2821
			zap.Int64("MsgID", cat.ID()),
			zap.Uint64("BeginTs", cat.BeginTs()),
			zap.Uint64("EndTs", cat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2822 2823
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
X
Xiaofan 已提交
2824
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2825 2826 2827 2828 2829 2830 2831

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
2843 2844
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2845 2846 2847
	return cat.result, nil
}

2848
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2849 2850 2851 2852
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2853 2854 2855 2856 2857

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2858 2859 2860 2861 2862 2863 2864
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2865
	method := "DropAlias"
2866
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2867
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias))
X
Xiaofan 已提交
2884
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2885

Y
Yusup 已提交
2886 2887 2888 2889 2890 2891
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2892 2893 2894
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2895
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2896 2897 2898 2899
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2900
		zap.String("alias", request.Alias))
D
dragondriver 已提交
2901 2902 2903 2904

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2905
			zap.Error(err),
D
dragondriver 已提交
2906
			zap.String("traceID", traceID),
2907
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2908 2909 2910 2911
			zap.Int64("MsgID", dat.ID()),
			zap.Uint64("BeginTs", dat.BeginTs()),
			zap.Uint64("EndTs", dat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2912 2913
			zap.String("alias", request.Alias))

X
Xiaofan 已提交
2914
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2915

Y
Yusup 已提交
2916 2917 2918 2919 2920 2921
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2922 2923 2924 2925 2926 2927 2928 2929 2930 2931
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

X
Xiaofan 已提交
2932 2933
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2934 2935 2936
	return dat.result, nil
}

2937
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2938 2939 2940 2941
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2942 2943 2944 2945 2946

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2947 2948 2949 2950 2951 2952 2953
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2954
	method := "AlterAlias"
2955
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2956
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
X
Xiaofan 已提交
2975
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2976

Y
Yusup 已提交
2977 2978 2979 2980 2981 2982
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2983 2984 2985
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2986
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2987 2988 2989 2990
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2991 2992
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2993 2994 2995 2996

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2997
			zap.Error(err),
D
dragondriver 已提交
2998
			zap.String("traceID", traceID),
2999
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3000 3001 3002 3003
			zap.Int64("MsgID", aat.ID()),
			zap.Uint64("BeginTs", aat.BeginTs()),
			zap.Uint64("EndTs", aat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
3004 3005 3006
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
3007
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3008

Y
Yusup 已提交
3009 3010 3011 3012 3013 3014
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
3026 3027
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3028 3029 3030
	return aat.result, nil
}

3031
// CalcDistance calculates the distances between vectors.
3032
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
3033 3034 3035 3036 3037
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
3038

3039 3040 3041 3042
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

3043 3044
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
3045

3046 3047 3048 3049 3050
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
3051 3052
		}

3053
		qt := &queryTask{
3054 3055 3056 3057 3058
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Retrieve,
X
Xiaofan 已提交
3059
					SourceID: Params.ProxyCfg.GetNodeID(),
3060
				},
3061
				ReqID: Params.ProxyCfg.GetNodeID(),
3062
			},
3063 3064 3065 3066 3067 3068
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

			getQueryNodePolicy: defaultGetQueryNodePolicy,
			queryShardPolicy:   roundRobinPolicy,
3069 3070
		}

G
groot 已提交
3071 3072 3073 3074 3075 3076
		items := []zapcore.Field{
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields),
		}

3077
		err := node.sched.dqQueue.Enqueue(qt)
3078
		if err != nil {
G
groot 已提交
3079
			log.Error("CalcDistance queryTask failed to enqueue", append(items, zap.Error(err))...)
3080

3081 3082 3083 3084 3085
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3086
			}, err
3087
		}
3088

G
groot 已提交
3089
		log.Debug("CalcDistance queryTask enqueued", items...)
3090 3091 3092

		err = qt.WaitToFinish()
		if err != nil {
G
groot 已提交
3093
			log.Error("CalcDistance queryTask failed to WaitToFinish", append(items, zap.Error(err))...)
3094 3095 3096 3097 3098 3099

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3100
			}, err
3101
		}
3102

G
groot 已提交
3103
		log.Debug("CalcDistance queryTask Done", items...)
3104 3105

		return &milvuspb.QueryResults{
3106 3107
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
3108 3109 3110
		}, nil
	}

G
groot 已提交
3111 3112 3113 3114
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
3115 3116
	}

G
groot 已提交
3117
	return task.Execute(ctx, request)
3118 3119
}

3120
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
3121
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
3122 3123
	panic("implement me")
}
X
XuanYang-cn 已提交
3124

3125
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
3126
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
D
dragondriver 已提交
3127
	log.Debug("GetPersistentSegmentInfo",
3128
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3129 3130 3131
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3132
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
3133
		Status: &commonpb.Status{
3134
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
3135 3136
		},
	}
3137 3138 3139 3140
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3141 3142
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
3143
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
3144
		metrics.TotalLabel).Inc()
G
godchen 已提交
3145
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
X
XuanYang-cn 已提交
3146
	if err != nil {
3147
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3148 3149
		return resp, nil
	}
3150
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
X
XuanYang-cn 已提交
3151
		Base: &commonpb.MsgBase{
3152
			MsgType:   commonpb.MsgType_SegmentInfo,
X
XuanYang-cn 已提交
3153 3154
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3155
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3156 3157 3158 3159
		},
		SegmentIDs: segments,
	})
	if err != nil {
3160
		log.Debug("GetPersistentSegmentInfo fail", zap.Error(err))
3161
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3162 3163
		return resp, nil
	}
3164
	log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status))
3165
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3166 3167 3168 3169 3170 3171
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3172
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3173 3174
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3175
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3176 3177 3178
			State:        info.State,
		}
	}
X
Xiaofan 已提交
3179
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
3180
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
3181
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3182
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3183 3184 3185 3186
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3187
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3188
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
D
dragondriver 已提交
3189
	log.Debug("GetQuerySegmentInfo",
3190
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3191 3192 3193
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3194
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3195
		Status: &commonpb.Status{
3196
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3197 3198
		},
	}
3199 3200 3201 3202
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3203

3204 3205 3206 3207 3208
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3209
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Z
zhenshan.cao 已提交
3210
		Base: &commonpb.MsgBase{
3211
			MsgType:   commonpb.MsgType_SegmentInfo,
Z
zhenshan.cao 已提交
3212 3213
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3214
			SourceID:  Params.ProxyCfg.GetNodeID(),
Z
zhenshan.cao 已提交
3215
		},
3216
		CollectionID: collID,
Z
zhenshan.cao 已提交
3217 3218
	})
	if err != nil {
3219
		log.Error("Failed to get segment info from QueryCoord",
3220
			zap.Error(err))
Z
zhenshan.cao 已提交
3221 3222 3223
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3224
	log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
3225
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
3226
		log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3240
			State:        info.SegmentState,
3241
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3242 3243
		}
	}
3244
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3245 3246 3247 3248
	resp.Infos = queryInfos
	return resp, nil
}

C
Cai Yudong 已提交
3249
func (node *Proxy) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
3250
	describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
X
XuanYang-cn 已提交
3251
		Base: &commonpb.MsgBase{
3252
			MsgType:   commonpb.MsgType_DescribeCollection,
X
XuanYang-cn 已提交
3253 3254
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3255
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3256 3257 3258 3259 3260 3261 3262
		},
		DbName:         dbName,
		CollectionName: collectionName,
	})
	if err != nil {
		return nil, err
	}
3263
	if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3264 3265 3266
		return nil, errors.New(describeCollectionResponse.Status.Reason)
	}
	collectionID := describeCollectionResponse.CollectionID
3267
	showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
X
XuanYang-cn 已提交
3268
		Base: &commonpb.MsgBase{
3269
			MsgType:   commonpb.MsgType_ShowPartitions,
X
XuanYang-cn 已提交
3270 3271
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3272
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3273 3274 3275 3276 3277 3278 3279 3280
		},
		DbName:         dbName,
		CollectionName: collectionName,
		CollectionID:   collectionID,
	})
	if err != nil {
		return nil, err
	}
3281
	if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3282 3283 3284 3285 3286
		return nil, errors.New(showPartitionsResp.Status.Reason)
	}

	ret := make([]UniqueID, 0)
	for _, partitionID := range showPartitionsResp.PartitionIDs {
3287
		showSegmentResponse, err := node.rootCoord.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
X
XuanYang-cn 已提交
3288
			Base: &commonpb.MsgBase{
3289
				MsgType:   commonpb.MsgType_ShowSegments,
X
XuanYang-cn 已提交
3290 3291
				MsgID:     0,
				Timestamp: 0,
X
Xiaofan 已提交
3292
				SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3293 3294 3295 3296 3297 3298 3299
			},
			CollectionID: collectionID,
			PartitionID:  partitionID,
		})
		if err != nil {
			return nil, err
		}
3300
		if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3301 3302 3303 3304 3305 3306
			return nil, errors.New(showSegmentResponse.Status.Reason)
		}
		ret = append(ret, showSegmentResponse.SegmentIDs...)
	}
	return ret, nil
}
3307

J
jingkl 已提交
3308
// Dummy handles dummy request
C
Cai Yudong 已提交
3309
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
	if err != nil {
		log.Debug("Failed to parse dummy request type")
		return failedResponse, nil
	}

3321 3322
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3323
		if err != nil {
3324
			log.Debug("Failed to parse dummy query request")
3325 3326 3327
			return failedResponse, nil
		}

3328
		request := &milvuspb.QueryRequest{
3329 3330 3331
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3332
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3333 3334
		}

3335
		_, err = node.Query(ctx, request)
3336
		if err != nil {
3337
			log.Debug("Failed to execute dummy query")
3338 3339
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3340 3341 3342 3343 3344 3345

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3346 3347
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3348 3349
}

J
jingkl 已提交
3350
// RegisterLink registers a link
C
Cai Yudong 已提交
3351
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
G
godchen 已提交
3352
	code := node.stateCode.Load().(internalpb.StateCode)
D
dragondriver 已提交
3353
	log.Debug("RegisterLink",
3354
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3355
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3356

G
godchen 已提交
3357
	if code != internalpb.StateCode_Healthy {
3358 3359 3360
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3361
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3362
				Reason:    "proxy not healthy",
3363 3364 3365
			},
		}, nil
	}
X
Xiaofan 已提交
3366
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Inc()
3367 3368 3369
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3370
			ErrorCode: commonpb.ErrorCode_Success,
3371
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3372 3373 3374
		},
	}, nil
}
3375

3376
// GetMetrics gets the metrics of proxy
3377 3378 3379
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("Proxy.GetMetrics",
X
Xiaofan 已提交
3380
		zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3381 3382 3383 3384
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
X
Xiaofan 已提交
3385
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3386
			zap.String("req", req.Request),
X
Xiaofan 已提交
3387
			zap.Error(errProxyIsUnhealthy(Params.ProxyCfg.GetNodeID())))
3388 3389 3390 3391

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
3392
				Reason:    msgProxyIsUnhealthy(Params.ProxyCfg.GetNodeID()),
3393 3394 3395 3396 3397 3398 3399 3400
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
X
Xiaofan 已提交
3401
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

D
dragondriver 已提交
3417 3418 3419 3420 3421 3422 3423 3424 3425 3426
	msgID := UniqueID(0)
	msgID, err = node.idAllocator.AllocOne()
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to allocate id",
			zap.Error(err))
	}
	req.Base = &commonpb.MsgBase{
		MsgType:   commonpb.MsgType_SystemInfo,
		MsgID:     msgID,
		Timestamp: 0,
X
Xiaofan 已提交
3427
		SourceID:  Params.ProxyCfg.GetNodeID(),
D
dragondriver 已提交
3428 3429
	}

3430
	if metricType == metricsinfo.SystemInfoMetrics {
3431 3432 3433 3434 3435 3436 3437
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3438
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3439 3440

		log.Debug("Proxy.GetMetrics",
X
Xiaofan 已提交
3441
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3442 3443 3444 3445 3446
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3447 3448
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3449
		return metrics, nil
3450 3451 3452
	}

	log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
X
Xiaofan 已提交
3453
		zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

B
bigsheeper 已提交
3466 3467 3468
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("Proxy.LoadBalance",
X
Xiaofan 已提交
3469
		zap.Int64("proxy_id", Params.ProxyCfg.GetNodeID()),
B
bigsheeper 已提交
3470 3471 3472 3473 3474 3475 3476 3477 3478
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3479 3480 3481 3482 3483 3484 3485

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
		log.Error("failed to get collection id", zap.String("collection name", req.GetCollectionName()), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3486 3487 3488 3489 3490
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_LoadBalanceSegments,
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3491
			SourceID:  Params.ProxyCfg.GetNodeID(),
B
bigsheeper 已提交
3492 3493 3494
		},
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3495
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3496
		SealedSegmentIDs: req.SealedSegmentIDs,
3497
		CollectionID:     collectionID,
B
bigsheeper 已提交
3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514
	})
	if err != nil {
		log.Error("Failed to LoadBalance from Query Coordinator",
			zap.Any("req", req), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
		log.Error("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason))
		status.Reason = infoResp.Reason
		return status, nil
	}
	log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp))
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

J
jingkl 已提交
3515
//GetCompactionState gets the compaction state of multiple segments
3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
	log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
	log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3529
// ManualCompaction invokes compaction on specified collection
3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
	log.Info("received ManualCompaction request", zap.Int64("collectionID", req.GetCollectionID()))
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
	log.Info("received ManualCompaction response", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3543
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
	log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

B
Bingyi Sun 已提交
3557 3558 3559
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	log.Info("received get flush state request", zap.Any("request", req))
3560
	var err error
B
Bingyi Sun 已提交
3561 3562 3563 3564 3565 3566 3567
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		log.Info("unable to get flush state because of closed server")
		return resp, nil
	}

3568
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3569 3570 3571 3572
	if err != nil {
		log.Info("failed to get flush state response", zap.Error(err))
		return nil, err
	}
B
Bingyi Sun 已提交
3573 3574 3575 3576
	log.Info("received get flush state response", zap.Any("response", resp))
	return resp, err
}

C
Cai Yudong 已提交
3577 3578
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3579 3580 3581 3582
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

J
jingkl 已提交
3583
//unhealthyStatus returns the proxy not healthy status
3584 3585 3586
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3587
		Reason:    "proxy not healthy",
3588 3589
	}
}
G
groot 已提交
3590 3591 3592

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3593 3594 3595
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
		zap.Bool("row-based", req.GetRowBased()))
3596 3597 3598 3599 3600 3601
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3602 3603 3604 3605
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617
	// Get collection ID and then channel names.
	collID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
		log.Error("collection ID not found",
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, err
	}
	chNames, err := node.chMgr.getVChannels(collID)
	if err != nil {
3618 3619 3620 3621 3622 3623 3624 3625
		err = node.chMgr.createDMLMsgStream(collID)
		if err != nil {
			return nil, err
		}
		chNames, err = node.chMgr.getVChannels(collID)
		if err != nil {
			return nil, err
		}
3626 3627
	}
	req.ChannelNames = chNames
3628 3629 3630
	if req.GetPartitionName() == "" {
		req.PartitionName = Params.CommonCfg.DefaultPartitionName
	}
3631 3632
	// Call rootCoord to finish import.
	resp, err = node.rootCoord.Import(ctx, req)
G
groot 已提交
3633
	return resp, err
G
groot 已提交
3634 3635
}

G
groot 已提交
3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663
// GetImportState checks import task state from datanode
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
	log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.rootCoord.GetImportState(ctx, req)
	log.Info("received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
	log.Info("received list import tasks request")
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.rootCoord.ListImportTasks(ctx, req)
	log.Info("received list import tasks response")
	return resp, err
}

X
XuanYang-cn 已提交
3664 3665 3666 3667 3668 3669 3670 3671 3672
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
	log.Info("received get replicas request")
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

3673 3674
	req.Base = &commonpb.MsgBase{
		MsgType:  commonpb.MsgType_GetReplicas,
X
Xiaofan 已提交
3675
		SourceID: Params.ProxyCfg.GetNodeID(),
3676 3677
	}

X
XuanYang-cn 已提交
3678 3679 3680 3681 3682
	resp, err := node.queryCoord.GetReplicas(ctx, req)
	log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	credInfo := &internalpb.CredentialInfo{
		Username:          request.Username,
		EncryptedPassword: request.Password,
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) ClearCredUsersCache(ctx context.Context, request *internalpb.ClearCredUsersCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	if globalMetaCache != nil {
		globalMetaCache.ClearCredUsers() // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
	log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
		log.Error("create credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3793
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
3794
	log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
C
codeman 已提交
3795 3796 3797 3798 3799 3800 3801 3802 3803
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
		log.Error("decode old password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3804 3805 3806 3807 3808 3809 3810
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3811 3812
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3813 3814 3815 3816 3817 3818
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
C
codeman 已提交
3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835
	// check old password is correct
	oldCredInfo, err := globalMetaCache.GetCredentialInfo(ctx, req.Username)
	if err != nil {
		log.Error("found no credential", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "found no credential:" + req.Username,
		}, nil
	}
	if !crypto.PasswordVerify(rawOldPassword, oldCredInfo.EncryptedPassword) {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3836 3837 3838 3839 3840 3841 3842
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3843
	updateCredReq := &internalpb.CredentialInfo{
3844 3845 3846
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3847
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859
	if err != nil { // for error like conntext timeout etc.
		log.Error("update credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
	log.Debug("DeleteCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
3860 3861 3862 3863 3864 3865
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
		log.Error("delete credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
	log.Debug("ListCredUsers", zap.String("role", typeutil.RootCoordRole))
	// get from cache
	usernames, err := globalMetaCache.GetCredUsernames(ctx)
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Usernames: usernames,
	}, nil
}
3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911

// SendSearchResult needs to be removed TODO
func (node *Proxy) SendSearchResult(ctx context.Context, req *internalpb.SearchResults) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
		Reason:    "Not implemented",
	}, nil
}

// SendRetrieveResult needs to be removed TODO
func (node *Proxy) SendRetrieveResult(ctx context.Context, req *internalpb.RetrieveResults) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
		Reason:    "Not implemented",
	}, nil
}