task.go 58.4 KB
Newer Older
1
package proxynode
Z
zhenshan.cao 已提交
2 3

import (
G
godchen 已提交
4
	"context"
5
	"errors"
6
	"fmt"
N
neza2017 已提交
7
	"math"
Z
zhenshan.cao 已提交
8
	"regexp"
9
	"runtime"
N
neza2017 已提交
10
	"strconv"
11 12 13
	"time"

	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
N
neza2017 已提交
14

15
	"go.uber.org/zap"
S
sunby 已提交
16

N
neza2017 已提交
17
	"github.com/golang/protobuf/proto"
18
	"github.com/zilliztech/milvus-distributed/internal/allocator"
19
	"github.com/zilliztech/milvus-distributed/internal/log"
20 21
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
22 23
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
G
godchen 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
25
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
26
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
N
neza2017 已提交
27
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
28
	"github.com/zilliztech/milvus-distributed/internal/types"
C
cai.zhang 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
Z
zhenshan.cao 已提交
30 31
)

S
sunby 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
const (
	InsertTaskName                  = "InsertTask"
	CreateCollectionTaskName        = "CreateCollectionTask"
	DropCollectionTaskName          = "DropCollectionTask"
	SearchTaskName                  = "SearchTask"
	HasCollectionTaskName           = "HasCollectionTask"
	DescribeCollectionTaskName      = "DescribeCollectionTask"
	GetCollectionStatisticsTaskName = "GetCollectionStatisticsTask"
	ShowCollectionTaskName          = "ShowCollectionTask"
	CreatePartitionTaskName         = "CreatePartitionTask"
	DropPartitionTaskName           = "DropPartitionTask"
	HasPartitionTaskName            = "HasPartitionTask"
	ShowPartitionTaskName           = "ShowPartitionTask"
	CreateIndexTaskName             = "CreateIndexTask"
	DescribeIndexTaskName           = "DescribeIndexTask"
	DropIndexTaskName               = "DropIndexTask"
	GetIndexStateTaskName           = "GetIndexStateTask"
	FlushTaskName                   = "FlushTask"
	LoadCollectionTaskName          = "LoadCollectionTask"
	ReleaseCollectionTaskName       = "ReleaseCollectionTask"
	LoadPartitionTaskName           = "LoadPartitionTask"
	ReleasePartitionTaskName        = "ReleasePartitionTask"
)

Z
zhenshan.cao 已提交
56
type task interface {
57
	TraceCtx() context.Context
58 59
	ID() UniqueID       // return ReqID
	SetID(uid UniqueID) // set ReqID
S
sunby 已提交
60
	Name() string
61
	Type() commonpb.MsgType
62 63
	BeginTs() Timestamp
	EndTs() Timestamp
Z
zhenshan.cao 已提交
64
	SetTs(ts Timestamp)
65
	OnEnqueue() error
S
sunby 已提交
66 67 68
	PreExecute(ctx context.Context) error
	Execute(ctx context.Context) error
	PostExecute(ctx context.Context) error
Z
zhenshan.cao 已提交
69
	WaitToFinish() error
70
	Notify(err error)
Z
zhenshan.cao 已提交
71 72
}

73
type BaseInsertTask = msgstream.InsertMsg
74 75

type InsertTask struct {
76
	BaseInsertTask
D
dragondriver 已提交
77
	Condition
T
ThreadDao 已提交
78 79 80 81
	ctx            context.Context
	dataService    types.DataService
	result         *milvuspb.InsertResponse
	rowIDAllocator *allocator.IDAllocator
82 83
}

84
func (it *InsertTask) TraceCtx() context.Context {
S
sunby 已提交
85 86 87 88 89
	return it.ctx
}

func (it *InsertTask) ID() UniqueID {
	return it.Base.MsgID
90 91
}

92
func (it *InsertTask) SetID(uid UniqueID) {
93
	it.Base.MsgID = uid
94 95
}

S
sunby 已提交
96 97 98 99 100 101 102 103 104 105 106 107
func (it *InsertTask) Name() string {
	return InsertTaskName
}

func (it *InsertTask) Type() commonpb.MsgType {
	return it.Base.MsgType
}

func (it *InsertTask) BeginTs() Timestamp {
	return it.BeginTimestamp
}

108
func (it *InsertTask) SetTs(ts Timestamp) {
N
neza2017 已提交
109 110 111 112 113 114 115
	rowNum := len(it.RowData)
	it.Timestamps = make([]uint64, rowNum)
	for index := range it.Timestamps {
		it.Timestamps[index] = ts
	}
	it.BeginTimestamp = ts
	it.EndTimestamp = ts
116 117 118
}

func (it *InsertTask) EndTs() Timestamp {
N
neza2017 已提交
119
	return it.EndTimestamp
120 121
}

S
sunby 已提交
122
func (it *InsertTask) OnEnqueue() error {
D
dragondriver 已提交
123
	it.BaseInsertTask.InsertRequest.Base = &commonpb.MsgBase{}
S
sunby 已提交
124
	return nil
125 126
}

S
sunby 已提交
127
func (it *InsertTask) PreExecute(ctx context.Context) error {
128
	it.Base.MsgType = commonpb.MsgType_Insert
129
	it.Base.SourceID = Params.ProxyID
130

N
neza2017 已提交
131 132 133 134
	collectionName := it.BaseInsertTask.CollectionName
	if err := ValidateCollectionName(collectionName); err != nil {
		return err
	}
135
	partitionTag := it.BaseInsertTask.PartitionName
N
neza2017 已提交
136 137 138 139
	if err := ValidatePartitionTag(partitionTag, true); err != nil {
		return err
	}

140 141 142
	return nil
}

S
sunby 已提交
143
func (it *InsertTask) Execute(ctx context.Context) error {
144
	collectionName := it.BaseInsertTask.CollectionName
G
godchen 已提交
145
	collSchema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
G
godchen 已提交
146 147
	if err != nil {
		return err
148
	}
G
godchen 已提交
149
	autoID := collSchema.AutoID
G
godchen 已提交
150
	collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
G
godchen 已提交
151
	if err != nil {
152 153
		return err
	}
G
godchen 已提交
154
	it.CollectionID = collID
155 156
	var partitionID UniqueID
	if len(it.PartitionName) > 0 {
G
godchen 已提交
157
		partitionID, err = globalMetaCache.GetPartitionID(ctx, collectionName, it.PartitionName)
158 159 160 161
		if err != nil {
			return err
		}
	} else {
162
		partitionID, err = globalMetaCache.GetPartitionID(ctx, collectionName, Params.DefaultPartitionName)
163 164 165
		if err != nil {
			return err
		}
G
godchen 已提交
166 167
	}
	it.PartitionID = partitionID
168 169
	var rowIDBegin UniqueID
	var rowIDEnd UniqueID
G
godchen 已提交
170 171
	rowNums := len(it.BaseInsertTask.RowData)
	rowIDBegin, rowIDEnd, _ = it.rowIDAllocator.Alloc(uint32(rowNums))
172

G
godchen 已提交
173 174 175 176 177 178 179
	it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums)
	for i := rowIDBegin; i < rowIDEnd; i++ {
		offset := i - rowIDBegin
		it.BaseInsertTask.RowIDs[offset] = i
	}

	if autoID {
N
neza2017 已提交
180 181 182
		if it.HashValues == nil || len(it.HashValues) == 0 {
			it.HashValues = make([]uint32, 0)
		}
G
godchen 已提交
183 184
		for _, rowID := range it.RowIDs {
			hashValue, _ := typeutil.Hash32Int64(rowID)
N
neza2017 已提交
185
			it.HashValues = append(it.HashValues, hashValue)
186 187 188
		}
	}

189
	var tsMsg msgstream.TsMsg = &it.BaseInsertTask
190 191
	it.BaseMsg.Ctx = ctx
	msgPack := msgstream.MsgPack{
192 193
		BeginTs: it.BeginTs(),
		EndTs:   it.EndTs(),
X
xige-16 已提交
194
		Msgs:    make([]msgstream.TsMsg, 1),
195
	}
G
godchen 已提交
196

197
	it.result = &milvuspb.InsertResponse{
198
		Status: &commonpb.Status{
199
			ErrorCode: commonpb.ErrorCode_Success,
200
		},
201 202
		RowIDBegin: rowIDBegin,
		RowIDEnd:   rowIDEnd,
203
	}
204 205 206

	msgPack.Msgs[0] = tsMsg

207
	stream, err := globalInsertChannelsMap.GetInsertMsgStream(collID)
208
	if err != nil {
G
godchen 已提交
209
		resp, _ := it.dataService.GetInsertChannels(ctx, &datapb.GetInsertChannelsRequest{
210
			Base: &commonpb.MsgBase{
211 212 213
				MsgType:   commonpb.MsgType_Insert, // todo
				MsgID:     it.Base.MsgID,           // todo
				Timestamp: 0,                       // todo
214 215 216
				SourceID:  Params.ProxyID,
			},
			DbID:         0, // todo
G
godchen 已提交
217
			CollectionID: collID,
218
		})
G
godchen 已提交
219 220 221
		if resp == nil {
			return errors.New("get insert channels resp is nil")
		}
222
		if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
223
			return errors.New(resp.Status.Reason)
224
		}
225
		err = globalInsertChannelsMap.CreateInsertMsgStream(collID, resp.Values)
226 227 228 229
		if err != nil {
			return err
		}
	}
230
	stream, err = globalInsertChannelsMap.GetInsertMsgStream(collID)
231
	if err != nil {
232
		it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
233 234 235 236
		it.result.Status.Reason = err.Error()
		return err
	}

237
	err = stream.Produce(&msgPack)
238
	if err != nil {
239
		it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
240
		it.result.Status.Reason = err.Error()
241
		return err
242
	}
243

244 245 246
	return nil
}

S
sunby 已提交
247
func (it *InsertTask) PostExecute(ctx context.Context) error {
248 249 250 251
	return nil
}

type CreateCollectionTask struct {
D
dragondriver 已提交
252
	Condition
253
	*milvuspb.CreateCollectionRequest
S
sunby 已提交
254
	ctx               context.Context
T
ThreadDao 已提交
255 256
	masterService     types.MasterService
	dataServiceClient types.DataService
257 258
	result            *commonpb.Status
	schema            *schemapb.CollectionSchema
259 260
}

261
func (cct *CreateCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
262
	return cct.ctx
263 264
}

C
cai.zhang 已提交
265
func (cct *CreateCollectionTask) ID() UniqueID {
266
	return cct.Base.MsgID
267 268
}

269
func (cct *CreateCollectionTask) SetID(uid UniqueID) {
270
	cct.Base.MsgID = uid
271 272
}

S
sunby 已提交
273 274 275 276
func (cct *CreateCollectionTask) Name() string {
	return CreateCollectionTaskName
}

277
func (cct *CreateCollectionTask) Type() commonpb.MsgType {
278
	return cct.Base.MsgType
279 280 281
}

func (cct *CreateCollectionTask) BeginTs() Timestamp {
282
	return cct.Base.Timestamp
283 284 285
}

func (cct *CreateCollectionTask) EndTs() Timestamp {
286
	return cct.Base.Timestamp
287 288 289
}

func (cct *CreateCollectionTask) SetTs(ts Timestamp) {
290
	cct.Base.Timestamp = ts
291 292
}

S
sunby 已提交
293 294 295 296 297 298
func (cct *CreateCollectionTask) OnEnqueue() error {
	cct.Base = &commonpb.MsgBase{}
	return nil
}

func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error {
299
	cct.Base.MsgType = commonpb.MsgType_CreateCollection
300
	cct.Base.SourceID = Params.ProxyID
301 302 303 304 305 306 307

	cct.schema = &schemapb.CollectionSchema{}
	err := proto.Unmarshal(cct.Schema, cct.schema)
	if err != nil {
		return err
	}

308
	if int64(len(cct.schema.Fields)) > Params.MaxFieldNum {
S
sunby 已提交
309
		return fmt.Errorf("maximum field's number should be limited to %d", Params.MaxFieldNum)
N
neza2017 已提交
310 311 312 313 314 315 316
	}

	// validate collection name
	if err := ValidateCollectionName(cct.schema.Name); err != nil {
		return err
	}

N
neza2017 已提交
317 318 319 320 321 322 323 324
	if err := ValidateDuplicatedFieldName(cct.schema.Fields); err != nil {
		return err
	}

	if err := ValidatePrimaryKey(cct.schema); err != nil {
		return err
	}

N
neza2017 已提交
325 326 327 328 329
	// validate field name
	for _, field := range cct.schema.Fields {
		if err := ValidateFieldName(field.Name); err != nil {
			return err
		}
G
godchen 已提交
330
		if field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_BinaryVector {
N
neza2017 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
			exist := false
			var dim int64 = 0
			for _, param := range field.TypeParams {
				if param.Key == "dim" {
					exist = true
					tmp, err := strconv.ParseInt(param.Value, 10, 64)
					if err != nil {
						return err
					}
					dim = tmp
					break
				}
			}
			if !exist {
				return errors.New("dimension is not defined in field type params")
			}
G
godchen 已提交
347
			if field.DataType == schemapb.DataType_FloatVector {
N
neza2017 已提交
348 349 350 351 352 353 354 355 356 357 358
				if err := ValidateDimension(dim, false); err != nil {
					return err
				}
			} else {
				if err := ValidateDimension(dim, true); err != nil {
					return err
				}
			}
		}
	}

359
	return nil
Z
zhenshan.cao 已提交
360 361
}

S
sunby 已提交
362
func (cct *CreateCollectionTask) Execute(ctx context.Context) error {
363
	var err error
T
ThreadDao 已提交
364
	cct.result, err = cct.masterService.CreateCollection(ctx, cct.CreateCollectionRequest)
365 366 367
	if err != nil {
		return err
	}
368
	if cct.result.ErrorCode == commonpb.ErrorCode_Success {
G
godchen 已提交
369
		collID, err := globalMetaCache.GetCollectionID(ctx, cct.CollectionName)
370 371 372
		if err != nil {
			return err
		}
G
godchen 已提交
373
		resp, _ := cct.dataServiceClient.GetInsertChannels(ctx, &datapb.GetInsertChannelsRequest{
374
			Base: &commonpb.MsgBase{
375 376 377
				MsgType:   commonpb.MsgType_Insert, // todo
				MsgID:     cct.Base.MsgID,          // todo
				Timestamp: 0,                       // todo
378 379 380
				SourceID:  Params.ProxyID,
			},
			DbID:         0, // todo
G
godchen 已提交
381
			CollectionID: collID,
382
		})
G
godchen 已提交
383 384 385
		if resp == nil {
			return errors.New("get insert channels resp is nil")
		}
386
		if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
387
			return errors.New(resp.Status.Reason)
388
		}
389
		err = globalInsertChannelsMap.CreateInsertMsgStream(collID, resp.Values)
390 391 392 393 394
		if err != nil {
			return err
		}
	}
	return nil
Z
zhenshan.cao 已提交
395 396
}

S
sunby 已提交
397
func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error {
398
	return nil
Z
zhenshan.cao 已提交
399 400
}

401
type DropCollectionTask struct {
D
dragondriver 已提交
402
	Condition
403
	*milvuspb.DropCollectionRequest
T
ThreadDao 已提交
404 405 406
	ctx           context.Context
	masterService types.MasterService
	result        *commonpb.Status
407 408
}

409
func (dct *DropCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
410
	return dct.ctx
411 412
}

C
cai.zhang 已提交
413
func (dct *DropCollectionTask) ID() UniqueID {
414
	return dct.Base.MsgID
415 416
}

417
func (dct *DropCollectionTask) SetID(uid UniqueID) {
418
	dct.Base.MsgID = uid
419 420
}

S
sunby 已提交
421 422 423 424
func (dct *DropCollectionTask) Name() string {
	return DropCollectionTaskName
}

425
func (dct *DropCollectionTask) Type() commonpb.MsgType {
426
	return dct.Base.MsgType
427 428 429
}

func (dct *DropCollectionTask) BeginTs() Timestamp {
430
	return dct.Base.Timestamp
431 432 433
}

func (dct *DropCollectionTask) EndTs() Timestamp {
434
	return dct.Base.Timestamp
435 436 437
}

func (dct *DropCollectionTask) SetTs(ts Timestamp) {
438
	dct.Base.Timestamp = ts
439 440
}

S
sunby 已提交
441 442 443 444 445 446
func (dct *DropCollectionTask) OnEnqueue() error {
	dct.Base = &commonpb.MsgBase{}
	return nil
}

func (dct *DropCollectionTask) PreExecute(ctx context.Context) error {
447
	dct.Base.MsgType = commonpb.MsgType_DropCollection
448
	dct.Base.SourceID = Params.ProxyID
449

450
	if err := ValidateCollectionName(dct.CollectionName); err != nil {
N
neza2017 已提交
451 452
		return err
	}
453 454 455
	return nil
}

S
sunby 已提交
456
func (dct *DropCollectionTask) Execute(ctx context.Context) error {
G
godchen 已提交
457
	collID, err := globalMetaCache.GetCollectionID(ctx, dct.CollectionName)
458 459 460
	if err != nil {
		return err
	}
S
sunby 已提交
461

T
ThreadDao 已提交
462
	dct.result, err = dct.masterService.DropCollection(ctx, dct.DropCollectionRequest)
S
sunby 已提交
463 464
	if err != nil {
		return err
465
	}
S
sunby 已提交
466

467
	err = globalInsertChannelsMap.CloseInsertMsgStream(collID)
G
godchen 已提交
468 469 470
	if err != nil {
		return err
	}
S
sunby 已提交
471

G
godchen 已提交
472
	return nil
473 474
}

S
sunby 已提交
475
func (dct *DropCollectionTask) PostExecute(ctx context.Context) error {
G
godchen 已提交
476
	globalMetaCache.RemoveCollection(ctx, dct.CollectionName)
Z
zhenshan.cao 已提交
477
	return nil
478 479
}

480
type SearchTask struct {
D
dragondriver 已提交
481
	Condition
G
godchen 已提交
482
	*internalpb.SearchRequest
S
sunby 已提交
483
	ctx            context.Context
Z
zhenshan.cao 已提交
484
	queryMsgStream msgstream.MsgStream
G
godchen 已提交
485
	resultBuf      chan []*internalpb.SearchResults
486 487 488 489
	result         *milvuspb.SearchResults
	query          *milvuspb.SearchRequest
}

490
func (st *SearchTask) TraceCtx() context.Context {
S
sunby 已提交
491
	return st.ctx
492 493
}

494 495
func (st *SearchTask) ID() UniqueID {
	return st.Base.MsgID
496 497
}

498 499
func (st *SearchTask) SetID(uid UniqueID) {
	st.Base.MsgID = uid
500 501
}

S
sunby 已提交
502 503 504 505
func (st *SearchTask) Name() string {
	return SearchTaskName
}

506 507
func (st *SearchTask) Type() commonpb.MsgType {
	return st.Base.MsgType
508 509
}

510 511
func (st *SearchTask) BeginTs() Timestamp {
	return st.Base.Timestamp
512 513
}

514 515
func (st *SearchTask) EndTs() Timestamp {
	return st.Base.Timestamp
516 517
}

518 519
func (st *SearchTask) SetTs(ts Timestamp) {
	st.Base.Timestamp = ts
520 521
}

S
sunby 已提交
522
func (st *SearchTask) OnEnqueue() error {
D
dragondriver 已提交
523
	st.Base = &commonpb.MsgBase{}
S
sunby 已提交
524 525 526 527
	return nil
}

func (st *SearchTask) PreExecute(ctx context.Context) error {
528
	st.Base.MsgType = commonpb.MsgType_Search
529
	st.Base.SourceID = Params.ProxyID
530

531
	collectionName := st.query.CollectionName
G
godchen 已提交
532
	_, err := globalMetaCache.GetCollectionID(ctx, collectionName)
533 534 535 536
	if err != nil { // err is not nil if collection not exists
		return err
	}

537
	if err := ValidateCollectionName(st.query.CollectionName); err != nil {
N
neza2017 已提交
538 539 540
		return err
	}

541
	for _, tag := range st.query.PartitionNames {
N
neza2017 已提交
542 543 544 545
		if err := ValidatePartitionTag(tag, false); err != nil {
			return err
		}
	}
546
	st.Base.MsgType = commonpb.MsgType_Search
547
	queryBytes, err := proto.Marshal(st.query)
C
cai.zhang 已提交
548 549 550
	if err != nil {
		return err
	}
551
	st.Query = &commonpb.Blob{
C
cai.zhang 已提交
552 553
		Value: queryBytes,
	}
554 555 556

	st.ResultChannelID = Params.SearchResultChannelNames[0]
	st.DbID = 0 // todo
G
godchen 已提交
557
	collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
558 559 560 561 562
	if err != nil { // err is not nil if collection not exists
		return err
	}
	st.CollectionID = collectionID
	st.PartitionIDs = make([]UniqueID, 0)
Z
zhenshan.cao 已提交
563 564 565 566 567 568 569

	partitionsMap, err := globalMetaCache.GetPartitions(ctx, collectionName)
	if err != nil {
		return err
	}

	partitionsRecord := make(map[UniqueID]bool)
570
	for _, partitionName := range st.query.PartitionNames {
Z
zhenshan.cao 已提交
571 572
		pattern := fmt.Sprintf("^%s$", partitionName)
		re, err := regexp.Compile(pattern)
573
		if err != nil {
Z
zhenshan.cao 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
			return errors.New("invalid partition names")
		}
		found := false
		for name, pID := range partitionsMap {
			if re.MatchString(name) {
				if _, exist := partitionsRecord[pID]; !exist {
					st.PartitionIDs = append(st.PartitionIDs, pID)
					partitionsRecord[pID] = true
				}
				found = true
			}
		}
		if !found {
			errMsg := fmt.Sprintf("PartitonName: %s not found", partitionName)
			return errors.New(errMsg)
589 590
		}
	}
Z
zhenshan.cao 已提交
591

592 593 594
	st.Dsl = st.query.Dsl
	st.PlaceholderGroup = st.query.PlaceholderGroup

595 596 597
	return nil
}

S
sunby 已提交
598
func (st *SearchTask) Execute(ctx context.Context) error {
599
	var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{
S
sunby 已提交
600
		SearchRequest: *st.SearchRequest,
601
		BaseMsg: msgstream.BaseMsg{
602
			Ctx:            ctx,
603
			HashValues:     []uint32{uint32(Params.ProxyID)},
604 605
			BeginTimestamp: st.Base.Timestamp,
			EndTimestamp:   st.Base.Timestamp,
606 607
		},
	}
608
	msgPack := msgstream.MsgPack{
609 610
		BeginTs: st.Base.Timestamp,
		EndTs:   st.Base.Timestamp,
X
xige-16 已提交
611
		Msgs:    make([]msgstream.TsMsg, 1),
612
	}
X
xige-16 已提交
613
	msgPack.Msgs[0] = tsMsg
614
	err := st.queryMsgStream.Produce(&msgPack)
615
	log.Debug("proxynode", zap.Int("length of searchMsg", len(msgPack.Msgs)))
C
cai.zhang 已提交
616
	if err != nil {
617
		log.Debug("proxynode", zap.String("send search request failed", err.Error()))
C
cai.zhang 已提交
618 619
	}
	return err
620 621
}

622 623 624 625
// TODO: add benchmark to compare with serial implementation
func decodeSearchResultsParallel(searchResults []*internalpb.SearchResults, maxParallel int) ([][]*milvuspb.Hits, error) {
	log.Debug("decodeSearchResultsParallel", zap.Any("NumOfGoRoutines", maxParallel))

626
	hits := make([][]*milvuspb.Hits, 0)
627
	// necessary to parallel this?
628 629 630 631 632
	for _, partialSearchResult := range searchResults {
		if partialSearchResult.Hits == nil || len(partialSearchResult.Hits) <= 0 {
			continue
		}

633 634
		nq := len(partialSearchResult.Hits)
		partialHits := make([]*milvuspb.Hits, nq)
635

636 637
		f := func(idx int) error {
			partialHit := &milvuspb.Hits{}
638

639 640 641 642
			err := proto.Unmarshal(partialSearchResult.Hits[idx], partialHit)
			if err != nil {
				return err
			}
643

644
			partialHits[idx] = partialHit
645

646
			return nil
647 648
		}

649
		err := funcutil.ProcessFuncParallel(nq, maxParallel, f, "decodePartialSearchResult")
650 651 652 653 654 655 656 657 658 659 660

		if err != nil {
			return nil, err
		}

		hits = append(hits, partialHits)
	}

	return hits, nil
}

661 662 663
func decodeSearchResultsSerial(searchResults []*internalpb.SearchResults) ([][]*milvuspb.Hits, error) {
	return decodeSearchResultsParallel(searchResults, 1)
}
664

665 666 667 668
// TODO: add benchmark to compare with serial implementation
func decodeSearchResultsParallelByNq(searchResults []*internalpb.SearchResults) ([][]*milvuspb.Hits, error) {
	if len(searchResults) <= 0 {
		return nil, errors.New("no need to decode empty search results")
669
	}
670 671 672
	nq := len(searchResults[0].Hits)
	return decodeSearchResultsParallel(searchResults, nq)
}
673

674 675 676
// TODO: add benchmark to compare with serial implementation
func decodeSearchResultsParallelByCPU(searchResults []*internalpb.SearchResults) ([][]*milvuspb.Hits, error) {
	return decodeSearchResultsParallel(searchResults, runtime.NumCPU())
677 678 679
}

func decodeSearchResults(searchResults []*internalpb.SearchResults) ([][]*milvuspb.Hits, error) {
680 681 682 683 684
	t := time.Now()
	defer func() {
		log.Debug("decodeSearchResults", zap.Any("time cost", time.Since(t)))
	}()
	return decodeSearchResultsParallelByCPU(searchResults)
685 686
}

687 688 689
func reduceSearchResultsParallel(hits [][]*milvuspb.Hits, nq, availableQueryNodeNum, topk int, metricType string, maxParallel int) *milvuspb.SearchResults {
	log.Debug("reduceSearchResultsParallel", zap.Any("NumOfGoRoutines", maxParallel))

690 691 692 693 694 695 696 697 698
	ret := &milvuspb.SearchResults{
		Status: &commonpb.Status{
			ErrorCode: 0,
		},
		Hits: make([][]byte, nq),
	}

	const minFloat32 = -1 * float32(math.MaxFloat32)

699 700 701 702 703 704 705
	f := func(idx int) error {
		locs := make([]int, availableQueryNodeNum)
		reducedHits := &milvuspb.Hits{
			IDs:     make([]int64, 0),
			RowData: make([][]byte, 0),
			Scores:  make([]float32, 0),
		}
706

707 708 709 710 711 712
		for j := 0; j < topk; j++ {
			valid := false
			choice, maxDistance := 0, minFloat32
			for q, loc := range locs { // query num, the number of ways to merge
				if loc >= len(hits[q][idx].IDs) {
					continue
713
				}
714 715 716 717 718
				distance := hits[q][idx].Scores[loc]
				if distance > maxDistance || (math.Abs(float64(distance-maxDistance)) < math.SmallestNonzeroFloat32 && choice != q) {
					choice = q
					maxDistance = distance
					valid = true
719 720
				}
			}
721 722
			if !valid {
				break
723
			}
724 725 726 727 728
			choiceOffset := locs[choice]
			// check if distance is valid, `invalid` here means very very big,
			// in this process, distance here is the smallest, so the rest of distance are all invalid
			if hits[choice][idx].Scores[choiceOffset] <= minFloat32 {
				break
729
			}
730 731 732
			reducedHits.IDs = append(reducedHits.IDs, hits[choice][idx].IDs[choiceOffset])
			if hits[choice][idx].RowData != nil && len(hits[choice][idx].RowData) > 0 {
				reducedHits.RowData = append(reducedHits.RowData, hits[choice][idx].RowData[choiceOffset])
733
			}
734 735 736
			reducedHits.Scores = append(reducedHits.Scores, hits[choice][idx].Scores[choiceOffset])
			locs[choice]++
		}
737

738 739 740
		if metricType != "IP" {
			for k := range reducedHits.Scores {
				reducedHits.Scores[k] *= -1
741
			}
742
		}
743

744 745 746 747
		reducedHitsBs, err := proto.Marshal(reducedHits)
		if err != nil {
			return err
		}
748

749
		ret.Hits[idx] = reducedHitsBs
750

751
		return nil
752 753
	}

754 755 756 757
	err := funcutil.ProcessFuncParallel(nq, maxParallel, f, "reduceSearchResults")
	if err != nil {
		return nil
	}
758 759 760 761

	return ret
}

762 763 764
func reduceSearchResultsSerial(hits [][]*milvuspb.Hits, nq, availableQueryNodeNum, topk int, metricType string) *milvuspb.SearchResults {
	return reduceSearchResultsParallel(hits, nq, availableQueryNodeNum, topk, metricType, 1)
}
765

766 767 768 769
// TODO: add benchmark to compare with serial implementation
func reduceSearchResultsParallelByNq(hits [][]*milvuspb.Hits, nq, availableQueryNodeNum, topk int, metricType string) *milvuspb.SearchResults {
	return reduceSearchResultsParallel(hits, nq, availableQueryNodeNum, topk, metricType, nq)
}
770

771 772 773
// TODO: add benchmark to compare with serial implementation
func reduceSearchResultsParallelByCPU(hits [][]*milvuspb.Hits, nq, availableQueryNodeNum, topk int, metricType string) *milvuspb.SearchResults {
	return reduceSearchResultsParallel(hits, nq, availableQueryNodeNum, topk, metricType, runtime.NumCPU())
774 775 776
}

func reduceSearchResults(hits [][]*milvuspb.Hits, nq, availableQueryNodeNum, topk int, metricType string) *milvuspb.SearchResults {
777 778 779 780 781
	t := time.Now()
	defer func() {
		log.Debug("reduceSearchResults", zap.Any("time cost", time.Since(t)))
	}()
	return reduceSearchResultsParallelByCPU(hits, nq, availableQueryNodeNum, topk, metricType)
782 783 784 785 786 787 788 789 790 791 792 793 794 795
}

func printSearchResult(partialSearchResult *internalpb.SearchResults) {
	for i := 0; i < len(partialSearchResult.Hits); i++ {
		testHits := milvuspb.Hits{}
		err := proto.Unmarshal(partialSearchResult.Hits[i], &testHits)
		if err != nil {
			panic(err)
		}
		fmt.Println(testHits.IDs)
		fmt.Println(testHits.Scores)
	}
}

S
sunby 已提交
796
func (st *SearchTask) PostExecute(ctx context.Context) error {
797 798 799 800
	t0 := time.Now()
	defer func() {
		log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
	}()
801 802
	for {
		select {
803
		case <-st.TraceCtx().Done():
804
			log.Debug("proxynode", zap.Int64("SearchTask: wait to finish failed, timeout!, taskID:", st.ID()))
S
sunby 已提交
805
			return fmt.Errorf("SearchTask:wait to finish failed, timeout: %d", st.ID())
806
		case searchResults := <-st.resultBuf:
807
			// fmt.Println("searchResults: ", searchResults)
G
godchen 已提交
808
			filterSearchResult := make([]*internalpb.SearchResults, 0)
Z
zhenshan.cao 已提交
809
			var filterReason string
810
			for _, partialSearchResult := range searchResults {
811
				if partialSearchResult.Status.ErrorCode == commonpb.ErrorCode_Success {
812
					filterSearchResult = append(filterSearchResult, partialSearchResult)
813
					// For debugging, please don't delete.
814
					// printSearchResult(partialSearchResult)
Z
zhenshan.cao 已提交
815 816
				} else {
					filterReason += partialSearchResult.Status.Reason + "\n"
817 818 819
				}
			}

820 821
			availableQueryNodeNum := len(filterSearchResult)
			if availableQueryNodeNum <= 0 {
822
				st.result = &milvuspb.SearchResults{
Z
zhenshan.cao 已提交
823
					Status: &commonpb.Status{
824
						ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
825 826 827 828
						Reason:    filterReason,
					},
				}
				return errors.New(filterReason)
829
			}
C
cai.zhang 已提交
830

831
			availableQueryNodeNum = 0
832
			for _, partialSearchResult := range filterSearchResult {
B
bigsheeper 已提交
833
				if partialSearchResult.Hits == nil || len(partialSearchResult.Hits) <= 0 {
834 835 836
					filterReason += "nq is zero\n"
					continue
				}
837
				availableQueryNodeNum++
838 839 840
			}

			if availableQueryNodeNum <= 0 {
841
				st.result = &milvuspb.SearchResults{
842
					Status: &commonpb.Status{
843
						ErrorCode: commonpb.ErrorCode_Success,
844 845 846 847 848 849
						Reason:    filterReason,
					},
				}
				return nil
			}

850 851 852 853 854
			hits, err := decodeSearchResults(filterSearchResult)
			if err != nil {
				return err
			}

855 856
			nq := len(hits[0])
			if nq <= 0 {
857
				st.result = &milvuspb.SearchResults{
858
					Status: &commonpb.Status{
859
						ErrorCode: commonpb.ErrorCode_Success,
860 861
						Reason:    filterReason,
					},
N
neza2017 已提交
862
				}
863
				return nil
N
neza2017 已提交
864
			}
C
cai.zhang 已提交
865

B
bigsheeper 已提交
866 867 868 869
			topk := 0
			for _, hit := range hits {
				topk = getMax(topk, len(hit[0].IDs))
			}
C
cai.zhang 已提交
870

871
			st.result = reduceSearchResults(hits, nq, availableQueryNodeNum, topk, searchResults[0].MetricType)
C
cai.zhang 已提交
872 873

			return nil
874 875
		}
	}
D
dragondriver 已提交
876 877
}

878
type HasCollectionTask struct {
D
dragondriver 已提交
879
	Condition
880
	*milvuspb.HasCollectionRequest
T
ThreadDao 已提交
881 882 883
	ctx           context.Context
	masterService types.MasterService
	result        *milvuspb.BoolResponse
884 885
}

886
func (hct *HasCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
887
	return hct.ctx
888 889
}

C
cai.zhang 已提交
890
func (hct *HasCollectionTask) ID() UniqueID {
891
	return hct.Base.MsgID
892 893
}

894
func (hct *HasCollectionTask) SetID(uid UniqueID) {
895
	hct.Base.MsgID = uid
896 897
}

S
sunby 已提交
898 899 900 901
func (hct *HasCollectionTask) Name() string {
	return HasCollectionTaskName
}

902
func (hct *HasCollectionTask) Type() commonpb.MsgType {
903
	return hct.Base.MsgType
904 905 906
}

func (hct *HasCollectionTask) BeginTs() Timestamp {
907
	return hct.Base.Timestamp
908 909 910
}

func (hct *HasCollectionTask) EndTs() Timestamp {
911
	return hct.Base.Timestamp
912 913 914
}

func (hct *HasCollectionTask) SetTs(ts Timestamp) {
915
	hct.Base.Timestamp = ts
916 917
}

S
sunby 已提交
918 919 920 921 922 923
func (hct *HasCollectionTask) OnEnqueue() error {
	hct.Base = &commonpb.MsgBase{}
	return nil
}

func (hct *HasCollectionTask) PreExecute(ctx context.Context) error {
924
	hct.Base.MsgType = commonpb.MsgType_HasCollection
925
	hct.Base.SourceID = Params.ProxyID
926

927
	if err := ValidateCollectionName(hct.CollectionName); err != nil {
N
neza2017 已提交
928 929
		return err
	}
930 931 932
	return nil
}

S
sunby 已提交
933
func (hct *HasCollectionTask) Execute(ctx context.Context) error {
934
	var err error
T
ThreadDao 已提交
935
	hct.result, err = hct.masterService.HasCollection(ctx, hct.HasCollectionRequest)
G
godchen 已提交
936 937 938
	if hct.result == nil {
		return errors.New("has collection resp is nil")
	}
939
	if hct.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
940 941
		return errors.New(hct.result.Status.Reason)
	}
942 943 944
	return err
}

S
sunby 已提交
945
func (hct *HasCollectionTask) PostExecute(ctx context.Context) error {
946 947 948 949
	return nil
}

type DescribeCollectionTask struct {
D
dragondriver 已提交
950
	Condition
951
	*milvuspb.DescribeCollectionRequest
T
ThreadDao 已提交
952 953 954
	ctx           context.Context
	masterService types.MasterService
	result        *milvuspb.DescribeCollectionResponse
955 956
}

957
func (dct *DescribeCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
958
	return dct.ctx
959 960
}

C
cai.zhang 已提交
961
func (dct *DescribeCollectionTask) ID() UniqueID {
962
	return dct.Base.MsgID
963 964
}

965
func (dct *DescribeCollectionTask) SetID(uid UniqueID) {
966
	dct.Base.MsgID = uid
967 968
}

S
sunby 已提交
969 970 971 972
func (dct *DescribeCollectionTask) Name() string {
	return DescribeCollectionTaskName
}

973
func (dct *DescribeCollectionTask) Type() commonpb.MsgType {
974
	return dct.Base.MsgType
975 976 977
}

func (dct *DescribeCollectionTask) BeginTs() Timestamp {
978
	return dct.Base.Timestamp
979 980 981
}

func (dct *DescribeCollectionTask) EndTs() Timestamp {
982
	return dct.Base.Timestamp
983 984 985
}

func (dct *DescribeCollectionTask) SetTs(ts Timestamp) {
986
	dct.Base.Timestamp = ts
987 988
}

S
sunby 已提交
989 990 991 992 993 994
func (dct *DescribeCollectionTask) OnEnqueue() error {
	dct.Base = &commonpb.MsgBase{}
	return nil
}

func (dct *DescribeCollectionTask) PreExecute(ctx context.Context) error {
995
	dct.Base.MsgType = commonpb.MsgType_DescribeCollection
996
	dct.Base.SourceID = Params.ProxyID
997

998
	if err := ValidateCollectionName(dct.CollectionName); err != nil {
N
neza2017 已提交
999 1000
		return err
	}
1001 1002 1003
	return nil
}

S
sunby 已提交
1004
func (dct *DescribeCollectionTask) Execute(ctx context.Context) error {
1005
	var err error
T
ThreadDao 已提交
1006
	dct.result, err = dct.masterService.DescribeCollection(ctx, dct.DescribeCollectionRequest)
G
godchen 已提交
1007 1008
	if dct.result == nil {
		return errors.New("has collection resp is nil")
1009
	}
1010
	if dct.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1011 1012 1013
		return errors.New(dct.result.Status.Reason)
	}
	return err
1014 1015
}

S
sunby 已提交
1016
func (dct *DescribeCollectionTask) PostExecute(ctx context.Context) error {
1017 1018 1019 1020 1021
	return nil
}

type GetCollectionsStatisticsTask struct {
	Condition
G
godchen 已提交
1022
	*milvuspb.GetCollectionStatisticsRequest
T
ThreadDao 已提交
1023 1024
	ctx         context.Context
	dataService types.DataService
G
godchen 已提交
1025
	result      *milvuspb.GetCollectionStatisticsResponse
1026 1027
}

1028
func (g *GetCollectionsStatisticsTask) TraceCtx() context.Context {
S
sunby 已提交
1029 1030 1031
	return g.ctx
}

1032 1033 1034 1035 1036 1037 1038 1039
func (g *GetCollectionsStatisticsTask) ID() UniqueID {
	return g.Base.MsgID
}

func (g *GetCollectionsStatisticsTask) SetID(uid UniqueID) {
	g.Base.MsgID = uid
}

S
sunby 已提交
1040 1041 1042 1043
func (g *GetCollectionsStatisticsTask) Name() string {
	return GetCollectionStatisticsTaskName
}

1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
func (g *GetCollectionsStatisticsTask) Type() commonpb.MsgType {
	return g.Base.MsgType
}

func (g *GetCollectionsStatisticsTask) BeginTs() Timestamp {
	return g.Base.Timestamp
}

func (g *GetCollectionsStatisticsTask) EndTs() Timestamp {
	return g.Base.Timestamp
}

func (g *GetCollectionsStatisticsTask) SetTs(ts Timestamp) {
	g.Base.Timestamp = ts
}

func (g *GetCollectionsStatisticsTask) OnEnqueue() error {
	g.Base = &commonpb.MsgBase{}
	return nil
}

S
sunby 已提交
1065
func (g *GetCollectionsStatisticsTask) PreExecute(ctx context.Context) error {
1066
	g.Base.MsgType = commonpb.MsgType_GetCollectionStatistics
1067 1068 1069 1070
	g.Base.SourceID = Params.ProxyID
	return nil
}

S
sunby 已提交
1071
func (g *GetCollectionsStatisticsTask) Execute(ctx context.Context) error {
G
godchen 已提交
1072
	collID, err := globalMetaCache.GetCollectionID(ctx, g.CollectionName)
1073 1074 1075
	if err != nil {
		return err
	}
G
godchen 已提交
1076
	req := &datapb.GetCollectionStatisticsRequest{
1077
		Base: &commonpb.MsgBase{
1078
			MsgType:   commonpb.MsgType_GetCollectionStatistics,
1079 1080 1081 1082 1083 1084 1085
			MsgID:     g.Base.MsgID,
			Timestamp: g.Base.Timestamp,
			SourceID:  g.Base.SourceID,
		},
		CollectionID: collID,
	}

T
ThreadDao 已提交
1086
	result, _ := g.dataService.GetCollectionStatistics(ctx, req)
G
godchen 已提交
1087 1088 1089
	if result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1090
	if result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1091
		return errors.New(result.Status.Reason)
1092
	}
G
godchen 已提交
1093
	g.result = &milvuspb.GetCollectionStatisticsResponse{
1094
		Status: &commonpb.Status{
1095
			ErrorCode: commonpb.ErrorCode_Success,
1096 1097 1098 1099 1100 1101 1102
			Reason:    "",
		},
		Stats: result.Stats,
	}
	return nil
}

S
sunby 已提交
1103
func (g *GetCollectionsStatisticsTask) PostExecute(ctx context.Context) error {
1104 1105 1106 1107
	return nil
}

type ShowCollectionsTask struct {
D
dragondriver 已提交
1108
	Condition
G
godchen 已提交
1109
	*milvuspb.ShowCollectionsRequest
T
ThreadDao 已提交
1110 1111
	ctx           context.Context
	masterService types.MasterService
G
godchen 已提交
1112
	result        *milvuspb.ShowCollectionsResponse
1113 1114
}

1115
func (sct *ShowCollectionsTask) TraceCtx() context.Context {
S
sunby 已提交
1116
	return sct.ctx
1117 1118
}

C
cai.zhang 已提交
1119
func (sct *ShowCollectionsTask) ID() UniqueID {
1120
	return sct.Base.MsgID
1121 1122
}

1123
func (sct *ShowCollectionsTask) SetID(uid UniqueID) {
1124
	sct.Base.MsgID = uid
1125 1126
}

S
sunby 已提交
1127 1128 1129 1130
func (sct *ShowCollectionsTask) Name() string {
	return ShowCollectionTaskName
}

1131
func (sct *ShowCollectionsTask) Type() commonpb.MsgType {
1132
	return sct.Base.MsgType
1133 1134 1135
}

func (sct *ShowCollectionsTask) BeginTs() Timestamp {
1136
	return sct.Base.Timestamp
1137 1138 1139
}

func (sct *ShowCollectionsTask) EndTs() Timestamp {
1140
	return sct.Base.Timestamp
1141 1142 1143
}

func (sct *ShowCollectionsTask) SetTs(ts Timestamp) {
1144
	sct.Base.Timestamp = ts
1145 1146
}

S
sunby 已提交
1147 1148 1149 1150 1151 1152
func (sct *ShowCollectionsTask) OnEnqueue() error {
	sct.Base = &commonpb.MsgBase{}
	return nil
}

func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error {
1153
	sct.Base.MsgType = commonpb.MsgType_ShowCollections
1154
	sct.Base.SourceID = Params.ProxyID
1155

1156 1157 1158
	return nil
}

S
sunby 已提交
1159
func (sct *ShowCollectionsTask) Execute(ctx context.Context) error {
1160
	var err error
G
godchen 已提交
1161
	sct.result, err = sct.masterService.ShowCollections(ctx, sct.ShowCollectionsRequest)
G
godchen 已提交
1162 1163 1164
	if sct.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1165
	if sct.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1166 1167
		return errors.New(sct.result.Status.Reason)
	}
1168 1169 1170
	return err
}

S
sunby 已提交
1171
func (sct *ShowCollectionsTask) PostExecute(ctx context.Context) error {
1172 1173
	return nil
}
N
neza2017 已提交
1174 1175 1176

type CreatePartitionTask struct {
	Condition
1177
	*milvuspb.CreatePartitionRequest
T
ThreadDao 已提交
1178 1179 1180
	ctx           context.Context
	masterService types.MasterService
	result        *commonpb.Status
N
neza2017 已提交
1181 1182
}

1183
func (cpt *CreatePartitionTask) TraceCtx() context.Context {
S
sunby 已提交
1184
	return cpt.ctx
1185 1186
}

N
neza2017 已提交
1187
func (cpt *CreatePartitionTask) ID() UniqueID {
1188
	return cpt.Base.MsgID
N
neza2017 已提交
1189 1190
}

1191
func (cpt *CreatePartitionTask) SetID(uid UniqueID) {
1192
	cpt.Base.MsgID = uid
1193 1194
}

S
sunby 已提交
1195 1196 1197 1198
func (cpt *CreatePartitionTask) Name() string {
	return CreatePartitionTaskName
}

1199
func (cpt *CreatePartitionTask) Type() commonpb.MsgType {
1200
	return cpt.Base.MsgType
N
neza2017 已提交
1201 1202 1203
}

func (cpt *CreatePartitionTask) BeginTs() Timestamp {
1204
	return cpt.Base.Timestamp
N
neza2017 已提交
1205 1206 1207
}

func (cpt *CreatePartitionTask) EndTs() Timestamp {
1208
	return cpt.Base.Timestamp
N
neza2017 已提交
1209 1210 1211
}

func (cpt *CreatePartitionTask) SetTs(ts Timestamp) {
1212
	cpt.Base.Timestamp = ts
N
neza2017 已提交
1213 1214
}

S
sunby 已提交
1215 1216 1217 1218 1219 1220
func (cpt *CreatePartitionTask) OnEnqueue() error {
	cpt.Base = &commonpb.MsgBase{}
	return nil
}

func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error {
1221
	cpt.Base.MsgType = commonpb.MsgType_CreatePartition
1222
	cpt.Base.SourceID = Params.ProxyID
1223

1224
	collName, partitionTag := cpt.CollectionName, cpt.PartitionName
N
neza2017 已提交
1225 1226 1227 1228 1229 1230 1231 1232 1233

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidatePartitionTag(partitionTag, true); err != nil {
		return err
	}

N
neza2017 已提交
1234 1235 1236
	return nil
}

S
sunby 已提交
1237
func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) {
T
ThreadDao 已提交
1238
	cpt.result, err = cpt.masterService.CreatePartition(ctx, cpt.CreatePartitionRequest)
G
godchen 已提交
1239 1240 1241
	if cpt.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1242
	if cpt.result.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1243 1244
		return errors.New(cpt.result.Reason)
	}
N
neza2017 已提交
1245 1246 1247
	return err
}

S
sunby 已提交
1248
func (cpt *CreatePartitionTask) PostExecute(ctx context.Context) error {
N
neza2017 已提交
1249 1250 1251 1252 1253
	return nil
}

type DropPartitionTask struct {
	Condition
1254
	*milvuspb.DropPartitionRequest
T
ThreadDao 已提交
1255 1256 1257
	ctx           context.Context
	masterService types.MasterService
	result        *commonpb.Status
N
neza2017 已提交
1258 1259
}

1260
func (dpt *DropPartitionTask) TraceCtx() context.Context {
S
sunby 已提交
1261
	return dpt.ctx
1262 1263
}

N
neza2017 已提交
1264
func (dpt *DropPartitionTask) ID() UniqueID {
1265
	return dpt.Base.MsgID
N
neza2017 已提交
1266 1267
}

1268
func (dpt *DropPartitionTask) SetID(uid UniqueID) {
1269
	dpt.Base.MsgID = uid
1270 1271
}

S
sunby 已提交
1272 1273 1274 1275
func (dpt *DropPartitionTask) Name() string {
	return DropPartitionTaskName
}

1276
func (dpt *DropPartitionTask) Type() commonpb.MsgType {
1277
	return dpt.Base.MsgType
N
neza2017 已提交
1278 1279 1280
}

func (dpt *DropPartitionTask) BeginTs() Timestamp {
1281
	return dpt.Base.Timestamp
N
neza2017 已提交
1282 1283 1284
}

func (dpt *DropPartitionTask) EndTs() Timestamp {
1285
	return dpt.Base.Timestamp
N
neza2017 已提交
1286 1287 1288
}

func (dpt *DropPartitionTask) SetTs(ts Timestamp) {
1289
	dpt.Base.Timestamp = ts
N
neza2017 已提交
1290 1291
}

S
sunby 已提交
1292 1293 1294 1295 1296 1297
func (dpt *DropPartitionTask) OnEnqueue() error {
	dpt.Base = &commonpb.MsgBase{}
	return nil
}

func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error {
1298
	dpt.Base.MsgType = commonpb.MsgType_DropPartition
1299
	dpt.Base.SourceID = Params.ProxyID
1300

1301
	collName, partitionTag := dpt.CollectionName, dpt.PartitionName
N
neza2017 已提交
1302 1303 1304 1305 1306 1307 1308 1309 1310

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidatePartitionTag(partitionTag, true); err != nil {
		return err
	}

N
neza2017 已提交
1311 1312 1313
	return nil
}

S
sunby 已提交
1314
func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) {
T
ThreadDao 已提交
1315
	dpt.result, err = dpt.masterService.DropPartition(ctx, dpt.DropPartitionRequest)
G
godchen 已提交
1316 1317 1318
	if dpt.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1319
	if dpt.result.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1320 1321
		return errors.New(dpt.result.Reason)
	}
N
neza2017 已提交
1322 1323 1324
	return err
}

S
sunby 已提交
1325
func (dpt *DropPartitionTask) PostExecute(ctx context.Context) error {
N
neza2017 已提交
1326 1327 1328 1329 1330
	return nil
}

type HasPartitionTask struct {
	Condition
1331
	*milvuspb.HasPartitionRequest
T
ThreadDao 已提交
1332 1333 1334
	ctx           context.Context
	masterService types.MasterService
	result        *milvuspb.BoolResponse
N
neza2017 已提交
1335 1336
}

1337
func (hpt *HasPartitionTask) TraceCtx() context.Context {
S
sunby 已提交
1338
	return hpt.ctx
1339 1340
}

N
neza2017 已提交
1341
func (hpt *HasPartitionTask) ID() UniqueID {
1342
	return hpt.Base.MsgID
N
neza2017 已提交
1343 1344
}

1345
func (hpt *HasPartitionTask) SetID(uid UniqueID) {
1346
	hpt.Base.MsgID = uid
1347 1348
}

S
sunby 已提交
1349 1350 1351 1352
func (hpt *HasPartitionTask) Name() string {
	return HasPartitionTaskName
}

1353
func (hpt *HasPartitionTask) Type() commonpb.MsgType {
1354
	return hpt.Base.MsgType
N
neza2017 已提交
1355 1356 1357
}

func (hpt *HasPartitionTask) BeginTs() Timestamp {
1358
	return hpt.Base.Timestamp
N
neza2017 已提交
1359 1360 1361
}

func (hpt *HasPartitionTask) EndTs() Timestamp {
1362
	return hpt.Base.Timestamp
N
neza2017 已提交
1363 1364 1365
}

func (hpt *HasPartitionTask) SetTs(ts Timestamp) {
1366
	hpt.Base.Timestamp = ts
N
neza2017 已提交
1367 1368
}

S
sunby 已提交
1369 1370 1371 1372 1373 1374
func (hpt *HasPartitionTask) OnEnqueue() error {
	hpt.Base = &commonpb.MsgBase{}
	return nil
}

func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error {
1375
	hpt.Base.MsgType = commonpb.MsgType_HasPartition
1376
	hpt.Base.SourceID = Params.ProxyID
1377

1378
	collName, partitionTag := hpt.CollectionName, hpt.PartitionName
N
neza2017 已提交
1379 1380 1381 1382 1383 1384 1385 1386

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidatePartitionTag(partitionTag, true); err != nil {
		return err
	}
N
neza2017 已提交
1387 1388 1389
	return nil
}

S
sunby 已提交
1390
func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) {
T
ThreadDao 已提交
1391
	hpt.result, err = hpt.masterService.HasPartition(ctx, hpt.HasPartitionRequest)
G
godchen 已提交
1392 1393 1394
	if hpt.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1395
	if hpt.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1396 1397
		return errors.New(hpt.result.Status.Reason)
	}
N
neza2017 已提交
1398 1399 1400
	return err
}

S
sunby 已提交
1401
func (hpt *HasPartitionTask) PostExecute(ctx context.Context) error {
N
neza2017 已提交
1402 1403 1404 1405 1406
	return nil
}

type ShowPartitionsTask struct {
	Condition
G
godchen 已提交
1407
	*milvuspb.ShowPartitionsRequest
T
ThreadDao 已提交
1408 1409
	ctx           context.Context
	masterService types.MasterService
G
godchen 已提交
1410
	result        *milvuspb.ShowPartitionsResponse
N
neza2017 已提交
1411 1412
}

1413
func (spt *ShowPartitionsTask) TraceCtx() context.Context {
S
sunby 已提交
1414
	return spt.ctx
1415 1416
}

N
neza2017 已提交
1417
func (spt *ShowPartitionsTask) ID() UniqueID {
1418
	return spt.Base.MsgID
N
neza2017 已提交
1419 1420
}

1421
func (spt *ShowPartitionsTask) SetID(uid UniqueID) {
1422
	spt.Base.MsgID = uid
1423 1424
}

S
sunby 已提交
1425 1426 1427 1428
func (spt *ShowPartitionsTask) Name() string {
	return ShowPartitionTaskName
}

1429
func (spt *ShowPartitionsTask) Type() commonpb.MsgType {
1430
	return spt.Base.MsgType
N
neza2017 已提交
1431 1432 1433
}

func (spt *ShowPartitionsTask) BeginTs() Timestamp {
1434
	return spt.Base.Timestamp
N
neza2017 已提交
1435 1436 1437
}

func (spt *ShowPartitionsTask) EndTs() Timestamp {
1438
	return spt.Base.Timestamp
N
neza2017 已提交
1439 1440 1441
}

func (spt *ShowPartitionsTask) SetTs(ts Timestamp) {
1442
	spt.Base.Timestamp = ts
N
neza2017 已提交
1443 1444
}

S
sunby 已提交
1445 1446 1447 1448 1449 1450
func (spt *ShowPartitionsTask) OnEnqueue() error {
	spt.Base = &commonpb.MsgBase{}
	return nil
}

func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error {
1451
	spt.Base.MsgType = commonpb.MsgType_ShowPartitions
1452
	spt.Base.SourceID = Params.ProxyID
1453

1454
	if err := ValidateCollectionName(spt.CollectionName); err != nil {
N
neza2017 已提交
1455 1456
		return err
	}
N
neza2017 已提交
1457 1458 1459
	return nil
}

S
sunby 已提交
1460
func (spt *ShowPartitionsTask) Execute(ctx context.Context) error {
1461
	var err error
G
godchen 已提交
1462
	spt.result, err = spt.masterService.ShowPartitions(ctx, spt.ShowPartitionsRequest)
G
godchen 已提交
1463 1464
	if spt.result == nil {
		return errors.New("get collection statistics resp is nil")
G
godchen 已提交
1465
	}
1466
	if spt.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1467 1468 1469
		return errors.New(spt.result.Status.Reason)
	}
	return err
N
neza2017 已提交
1470 1471
}

S
sunby 已提交
1472
func (spt *ShowPartitionsTask) PostExecute(ctx context.Context) error {
N
neza2017 已提交
1473 1474
	return nil
}
1475 1476 1477

type CreateIndexTask struct {
	Condition
1478
	*milvuspb.CreateIndexRequest
T
ThreadDao 已提交
1479 1480 1481
	ctx           context.Context
	masterService types.MasterService
	result        *commonpb.Status
1482 1483
}

1484
func (cit *CreateIndexTask) TraceCtx() context.Context {
S
sunby 已提交
1485
	return cit.ctx
1486 1487
}

1488
func (cit *CreateIndexTask) ID() UniqueID {
1489
	return cit.Base.MsgID
1490 1491 1492
}

func (cit *CreateIndexTask) SetID(uid UniqueID) {
1493
	cit.Base.MsgID = uid
1494 1495
}

S
sunby 已提交
1496 1497 1498 1499
func (cit *CreateIndexTask) Name() string {
	return CreateIndexTaskName
}

1500
func (cit *CreateIndexTask) Type() commonpb.MsgType {
1501
	return cit.Base.MsgType
1502 1503 1504
}

func (cit *CreateIndexTask) BeginTs() Timestamp {
1505
	return cit.Base.Timestamp
1506 1507 1508
}

func (cit *CreateIndexTask) EndTs() Timestamp {
1509
	return cit.Base.Timestamp
1510 1511 1512
}

func (cit *CreateIndexTask) SetTs(ts Timestamp) {
1513
	cit.Base.Timestamp = ts
1514 1515
}

S
sunby 已提交
1516 1517 1518 1519 1520 1521
func (cit *CreateIndexTask) OnEnqueue() error {
	cit.Base = &commonpb.MsgBase{}
	return nil
}

func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
1522
	cit.Base.MsgType = commonpb.MsgType_CreateIndex
1523
	cit.Base.SourceID = Params.ProxyID
1524

1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
	collName, fieldName := cit.CollectionName, cit.FieldName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidateFieldName(fieldName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
1538
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
G
godchen 已提交
1539
	var err error
T
ThreadDao 已提交
1540
	cit.result, err = cit.masterService.CreateIndex(ctx, cit.CreateIndexRequest)
G
godchen 已提交
1541 1542 1543
	if cit.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1544
	if cit.result.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1545 1546
		return errors.New(cit.result.Reason)
	}
1547 1548 1549
	return err
}

S
sunby 已提交
1550
func (cit *CreateIndexTask) PostExecute(ctx context.Context) error {
1551 1552 1553 1554 1555
	return nil
}

type DescribeIndexTask struct {
	Condition
1556
	*milvuspb.DescribeIndexRequest
T
ThreadDao 已提交
1557 1558 1559
	ctx           context.Context
	masterService types.MasterService
	result        *milvuspb.DescribeIndexResponse
1560 1561
}

1562
func (dit *DescribeIndexTask) TraceCtx() context.Context {
S
sunby 已提交
1563
	return dit.ctx
1564 1565
}

1566
func (dit *DescribeIndexTask) ID() UniqueID {
1567
	return dit.Base.MsgID
1568 1569 1570
}

func (dit *DescribeIndexTask) SetID(uid UniqueID) {
1571
	dit.Base.MsgID = uid
1572 1573
}

S
sunby 已提交
1574 1575 1576 1577
func (dit *DescribeIndexTask) Name() string {
	return DescribeIndexTaskName
}

1578
func (dit *DescribeIndexTask) Type() commonpb.MsgType {
1579
	return dit.Base.MsgType
1580 1581 1582
}

func (dit *DescribeIndexTask) BeginTs() Timestamp {
1583
	return dit.Base.Timestamp
1584 1585 1586
}

func (dit *DescribeIndexTask) EndTs() Timestamp {
1587
	return dit.Base.Timestamp
1588 1589 1590
}

func (dit *DescribeIndexTask) SetTs(ts Timestamp) {
1591
	dit.Base.Timestamp = ts
1592 1593
}

S
sunby 已提交
1594 1595 1596 1597 1598 1599
func (dit *DescribeIndexTask) OnEnqueue() error {
	dit.Base = &commonpb.MsgBase{}
	return nil
}

func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
1600
	dit.Base.MsgType = commonpb.MsgType_DescribeIndex
1601
	dit.Base.SourceID = Params.ProxyID
1602

1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
	collName, fieldName := dit.CollectionName, dit.FieldName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidateFieldName(fieldName); err != nil {
		return err
	}

Z
zhenshan.cao 已提交
1613 1614 1615 1616 1617
	// only support default index name for now. @2021.02.18
	if dit.IndexName == "" {
		dit.IndexName = Params.DefaultIndexName
	}

1618 1619 1620
	return nil
}

S
sunby 已提交
1621
func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
1622
	var err error
T
ThreadDao 已提交
1623
	dit.result, err = dit.masterService.DescribeIndex(ctx, dit.DescribeIndexRequest)
G
godchen 已提交
1624 1625 1626
	if dit.result == nil {
		return errors.New("get collection statistics resp is nil")
	}
1627
	if dit.result.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1628 1629
		return errors.New(dit.result.Status.Reason)
	}
1630 1631 1632
	return err
}

S
sunby 已提交
1633
func (dit *DescribeIndexTask) PostExecute(ctx context.Context) error {
1634 1635 1636
	return nil
}

B
BossZou 已提交
1637 1638
type DropIndexTask struct {
	Condition
S
sunby 已提交
1639
	ctx context.Context
B
BossZou 已提交
1640
	*milvuspb.DropIndexRequest
T
ThreadDao 已提交
1641 1642
	masterService types.MasterService
	result        *commonpb.Status
B
BossZou 已提交
1643 1644
}

1645
func (dit *DropIndexTask) TraceCtx() context.Context {
S
sunby 已提交
1646
	return dit.ctx
B
BossZou 已提交
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656
}

func (dit *DropIndexTask) ID() UniqueID {
	return dit.Base.MsgID
}

func (dit *DropIndexTask) SetID(uid UniqueID) {
	dit.Base.MsgID = uid
}

S
sunby 已提交
1657 1658 1659 1660
func (dit *DropIndexTask) Name() string {
	return DropIndexTaskName
}

B
BossZou 已提交
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676
func (dit *DropIndexTask) Type() commonpb.MsgType {
	return dit.Base.MsgType
}

func (dit *DropIndexTask) BeginTs() Timestamp {
	return dit.Base.Timestamp
}

func (dit *DropIndexTask) EndTs() Timestamp {
	return dit.Base.Timestamp
}

func (dit *DropIndexTask) SetTs(ts Timestamp) {
	dit.Base.Timestamp = ts
}

S
sunby 已提交
1677 1678 1679 1680 1681 1682
func (dit *DropIndexTask) OnEnqueue() error {
	dit.Base = &commonpb.MsgBase{}
	return nil
}

func (dit *DropIndexTask) PreExecute(ctx context.Context) error {
1683
	dit.Base.MsgType = commonpb.MsgType_DropIndex
B
BossZou 已提交
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
	dit.Base.SourceID = Params.ProxyID

	collName, fieldName := dit.CollectionName, dit.FieldName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidateFieldName(fieldName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
1699
func (dit *DropIndexTask) Execute(ctx context.Context) error {
B
BossZou 已提交
1700
	var err error
T
ThreadDao 已提交
1701
	dit.result, err = dit.masterService.DropIndex(ctx, dit.DropIndexRequest)
B
BossZou 已提交
1702 1703 1704
	if dit.result == nil {
		return errors.New("drop index resp is nil")
	}
1705
	if dit.result.ErrorCode != commonpb.ErrorCode_Success {
B
BossZou 已提交
1706 1707 1708 1709 1710
		return errors.New(dit.result.Reason)
	}
	return err
}

S
sunby 已提交
1711
func (dit *DropIndexTask) PostExecute(ctx context.Context) error {
B
BossZou 已提交
1712 1713 1714
	return nil
}

1715
type GetIndexStateTask struct {
1716
	Condition
G
godchen 已提交
1717
	*milvuspb.GetIndexStateRequest
T
ThreadDao 已提交
1718 1719 1720
	ctx           context.Context
	indexService  types.IndexService
	masterService types.MasterService
G
godchen 已提交
1721
	result        *milvuspb.GetIndexStateResponse
1722 1723
}

1724
func (gist *GetIndexStateTask) TraceCtx() context.Context {
S
sunby 已提交
1725
	return gist.ctx
1726 1727
}

S
sunby 已提交
1728 1729
func (gist *GetIndexStateTask) ID() UniqueID {
	return gist.Base.MsgID
1730 1731
}

S
sunby 已提交
1732 1733
func (gist *GetIndexStateTask) SetID(uid UniqueID) {
	gist.Base.MsgID = uid
1734 1735
}

S
sunby 已提交
1736 1737
func (gist *GetIndexStateTask) Name() string {
	return GetIndexStateTaskName
1738 1739
}

S
sunby 已提交
1740 1741
func (gist *GetIndexStateTask) Type() commonpb.MsgType {
	return gist.Base.MsgType
1742 1743
}

S
sunby 已提交
1744 1745
func (gist *GetIndexStateTask) BeginTs() Timestamp {
	return gist.Base.Timestamp
1746 1747
}

S
sunby 已提交
1748 1749
func (gist *GetIndexStateTask) EndTs() Timestamp {
	return gist.Base.Timestamp
1750 1751
}

S
sunby 已提交
1752 1753 1754 1755 1756 1757 1758 1759
func (gist *GetIndexStateTask) SetTs(ts Timestamp) {
	gist.Base.Timestamp = ts
}

func (gist *GetIndexStateTask) OnEnqueue() error {
	gist.Base = &commonpb.MsgBase{}
	return nil
}
1760

S
sunby 已提交
1761
func (gist *GetIndexStateTask) PreExecute(ctx context.Context) error {
1762
	gist.Base.MsgType = commonpb.MsgType_GetIndexState
S
sunby 已提交
1763 1764 1765
	gist.Base.SourceID = Params.ProxyID

	collName, fieldName := gist.CollectionName, gist.FieldName
1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	if err := ValidateFieldName(fieldName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
1778 1779
func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
	collectionName := gist.CollectionName
G
godchen 已提交
1780
	collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
Z
zhenshan.cao 已提交
1781 1782 1783 1784
	if err != nil { // err is not nil if collection not exists
		return err
	}

G
godchen 已提交
1785
	showPartitionRequest := &milvuspb.ShowPartitionsRequest{
Z
zhenshan.cao 已提交
1786
		Base: &commonpb.MsgBase{
1787
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
1788 1789
			MsgID:     gist.Base.MsgID,
			Timestamp: gist.Base.Timestamp,
Z
zhenshan.cao 已提交
1790 1791
			SourceID:  Params.ProxyID,
		},
S
sunby 已提交
1792
		DbName:         gist.DbName,
Z
zhenshan.cao 已提交
1793 1794 1795
		CollectionName: collectionName,
		CollectionID:   collectionID,
	}
T
ThreadDao 已提交
1796
	partitions, err := gist.masterService.ShowPartitions(ctx, showPartitionRequest)
Z
zhenshan.cao 已提交
1797 1798 1799 1800
	if err != nil {
		return err
	}

S
sunby 已提交
1801 1802
	if gist.IndexName == "" {
		gist.IndexName = Params.DefaultIndexName
1803 1804 1805 1806
	}

	describeIndexReq := milvuspb.DescribeIndexRequest{
		Base: &commonpb.MsgBase{
1807
			MsgType:   commonpb.MsgType_DescribeIndex,
S
sunby 已提交
1808 1809
			MsgID:     gist.Base.MsgID,
			Timestamp: gist.Base.Timestamp,
1810 1811
			SourceID:  Params.ProxyID,
		},
S
sunby 已提交
1812 1813 1814 1815
		DbName:         gist.DbName,
		CollectionName: gist.CollectionName,
		FieldName:      gist.FieldName,
		IndexName:      gist.IndexName,
1816 1817
	}

T
ThreadDao 已提交
1818
	indexDescriptionResp, err2 := gist.masterService.DescribeIndex(ctx, &describeIndexReq)
1819 1820 1821 1822 1823 1824 1825
	if err2 != nil {
		return err2
	}

	matchIndexID := int64(-1)
	foundIndexID := false
	for _, desc := range indexDescriptionResp.IndexDescriptions {
S
sunby 已提交
1826
		if desc.IndexName == gist.IndexName {
1827 1828 1829 1830 1831 1832
			matchIndexID = desc.IndexID
			foundIndexID = true
			break
		}
	}
	if !foundIndexID {
S
sunby 已提交
1833
		return errors.New(fmt.Sprint("Can't found IndexID for indexName", gist.IndexName))
1834 1835
	}

Z
zhenshan.cao 已提交
1836
	var allSegmentIDs []UniqueID
Z
zhenshan.cao 已提交
1837
	for _, partitionID := range partitions.PartitionIDs {
G
godchen 已提交
1838
		showSegmentsRequest := &milvuspb.ShowSegmentsRequest{
Z
zhenshan.cao 已提交
1839
			Base: &commonpb.MsgBase{
1840
				MsgType:   commonpb.MsgType_ShowSegments,
S
sunby 已提交
1841 1842
				MsgID:     gist.Base.MsgID,
				Timestamp: gist.Base.Timestamp,
Z
zhenshan.cao 已提交
1843 1844 1845 1846 1847
				SourceID:  Params.ProxyID,
			},
			CollectionID: collectionID,
			PartitionID:  partitionID,
		}
T
ThreadDao 已提交
1848
		segments, err := gist.masterService.ShowSegments(ctx, showSegmentsRequest)
Z
zhenshan.cao 已提交
1849 1850 1851
		if err != nil {
			return err
		}
1852
		if segments.Status.ErrorCode != commonpb.ErrorCode_Success {
Z
zhenshan.cao 已提交
1853
			return errors.New(segments.Status.Reason)
Z
zhenshan.cao 已提交
1854
		}
Z
zhenshan.cao 已提交
1855 1856 1857
		allSegmentIDs = append(allSegmentIDs, segments.SegmentIDs...)
	}

G
godchen 已提交
1858
	getIndexStatesRequest := &indexpb.GetIndexStatesRequest{
Z
zhenshan.cao 已提交
1859 1860
		IndexBuildIDs: make([]UniqueID, 0),
	}
1861 1862
	enableIndexBitMap := make([]bool, 0)
	indexBuildIDs := make([]UniqueID, 0)
Z
zhenshan.cao 已提交
1863

Z
zhenshan.cao 已提交
1864 1865 1866
	for _, segmentID := range allSegmentIDs {
		describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
			Base: &commonpb.MsgBase{
1867
				MsgType:   commonpb.MsgType_DescribeSegment,
S
sunby 已提交
1868 1869
				MsgID:     gist.Base.MsgID,
				Timestamp: gist.Base.Timestamp,
Z
zhenshan.cao 已提交
1870 1871 1872 1873 1874
				SourceID:  Params.ProxyID,
			},
			CollectionID: collectionID,
			SegmentID:    segmentID,
		}
T
ThreadDao 已提交
1875
		segmentDesc, err := gist.masterService.DescribeSegment(ctx, describeSegmentRequest)
Z
zhenshan.cao 已提交
1876 1877 1878
		if err != nil {
			return err
		}
Z
zhenshan.cao 已提交
1879
		if segmentDesc.IndexID == matchIndexID {
1880 1881 1882 1883 1884 1885
			indexBuildIDs = append(indexBuildIDs, segmentDesc.BuildID)
			if segmentDesc.EnableIndex {
				enableIndexBitMap = append(enableIndexBitMap, true)
			} else {
				enableIndexBitMap = append(enableIndexBitMap, false)
			}
Z
zhenshan.cao 已提交
1886 1887
		}
	}
Z
zhenshan.cao 已提交
1888

1889 1890
	log.Debug("proxynode", zap.Int("GetIndexState:: len of allSegmentIDs", len(allSegmentIDs)))
	log.Debug("proxynode", zap.Int("GetIndexState:: len of IndexBuildIDs", len(indexBuildIDs)))
1891
	if len(allSegmentIDs) != len(indexBuildIDs) {
G
godchen 已提交
1892
		gist.result = &milvuspb.GetIndexStateResponse{
Z
zhenshan.cao 已提交
1893
			Status: &commonpb.Status{
1894
				ErrorCode: commonpb.ErrorCode_Success,
Z
zhenshan.cao 已提交
1895 1896
				Reason:    "",
			},
T
ThreadDao 已提交
1897
			State: commonpb.IndexState_InProgress,
Z
zhenshan.cao 已提交
1898 1899 1900 1901
		}
		return err
	}

1902 1903 1904 1905 1906
	for idx, enableIndex := range enableIndexBitMap {
		if enableIndex {
			getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, indexBuildIDs[idx])
		}
	}
T
ThreadDao 已提交
1907
	states, err := gist.indexService.GetIndexStates(ctx, getIndexStatesRequest)
Z
zhenshan.cao 已提交
1908 1909 1910 1911
	if err != nil {
		return err
	}

1912
	if states.Status.ErrorCode != commonpb.ErrorCode_Success {
G
godchen 已提交
1913
		gist.result = &milvuspb.GetIndexStateResponse{
Z
zhenshan.cao 已提交
1914
			Status: states.Status,
T
ThreadDao 已提交
1915
			State:  commonpb.IndexState_Failed,
Z
zhenshan.cao 已提交
1916 1917 1918 1919 1920 1921
		}

		return nil
	}

	for _, state := range states.States {
T
ThreadDao 已提交
1922
		if state.State != commonpb.IndexState_Finished {
G
godchen 已提交
1923
			gist.result = &milvuspb.GetIndexStateResponse{
Z
zhenshan.cao 已提交
1924
				Status: states.Status,
Z
zhenshan.cao 已提交
1925
				State:  state.State,
Z
zhenshan.cao 已提交
1926 1927
			}

Z
zhenshan.cao 已提交
1928
			return nil
Z
zhenshan.cao 已提交
1929 1930 1931
		}
	}

G
godchen 已提交
1932
	gist.result = &milvuspb.GetIndexStateResponse{
1933
		Status: &commonpb.Status{
1934
			ErrorCode: commonpb.ErrorCode_Success,
1935 1936
			Reason:    "",
		},
T
ThreadDao 已提交
1937
		State: commonpb.IndexState_Finished,
1938
	}
Z
zhenshan.cao 已提交
1939

1940
	return nil
1941 1942
}

S
sunby 已提交
1943
func (gist *GetIndexStateTask) PostExecute(ctx context.Context) error {
1944 1945
	return nil
}
Z
zhenshan.cao 已提交
1946 1947 1948 1949

type FlushTask struct {
	Condition
	*milvuspb.FlushRequest
T
ThreadDao 已提交
1950 1951 1952
	ctx         context.Context
	dataService types.DataService
	result      *commonpb.Status
Z
zhenshan.cao 已提交
1953 1954
}

1955
func (ft *FlushTask) TraceCtx() context.Context {
S
sunby 已提交
1956
	return ft.ctx
Z
zhenshan.cao 已提交
1957 1958 1959 1960 1961 1962 1963 1964 1965 1966
}

func (ft *FlushTask) ID() UniqueID {
	return ft.Base.MsgID
}

func (ft *FlushTask) SetID(uid UniqueID) {
	ft.Base.MsgID = uid
}

S
sunby 已提交
1967 1968 1969 1970
func (ft *FlushTask) Name() string {
	return FlushTaskName
}

Z
zhenshan.cao 已提交
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986
func (ft *FlushTask) Type() commonpb.MsgType {
	return ft.Base.MsgType
}

func (ft *FlushTask) BeginTs() Timestamp {
	return ft.Base.Timestamp
}

func (ft *FlushTask) EndTs() Timestamp {
	return ft.Base.Timestamp
}

func (ft *FlushTask) SetTs(ts Timestamp) {
	ft.Base.Timestamp = ts
}

S
sunby 已提交
1987 1988 1989 1990 1991 1992
func (ft *FlushTask) OnEnqueue() error {
	ft.Base = &commonpb.MsgBase{}
	return nil
}

func (ft *FlushTask) PreExecute(ctx context.Context) error {
1993
	ft.Base.MsgType = commonpb.MsgType_Flush
Z
zhenshan.cao 已提交
1994 1995 1996 1997
	ft.Base.SourceID = Params.ProxyID
	return nil
}

S
sunby 已提交
1998
func (ft *FlushTask) Execute(ctx context.Context) error {
1999
	for _, collName := range ft.CollectionNames {
G
godchen 已提交
2000
		collID, err := globalMetaCache.GetCollectionID(ctx, collName)
2001 2002 2003 2004 2005
		if err != nil {
			return err
		}
		flushReq := &datapb.FlushRequest{
			Base: &commonpb.MsgBase{
2006
				MsgType:   commonpb.MsgType_Flush,
2007 2008 2009 2010 2011 2012 2013 2014
				MsgID:     ft.Base.MsgID,
				Timestamp: ft.Base.Timestamp,
				SourceID:  ft.Base.SourceID,
			},
			DbID:         0,
			CollectionID: collID,
		}
		var status *commonpb.Status
T
ThreadDao 已提交
2015
		status, _ = ft.dataService.Flush(ctx, flushReq)
G
godchen 已提交
2016 2017
		if status == nil {
			return errors.New("flush resp is nil")
2018
		}
2019
		if status.ErrorCode != commonpb.ErrorCode_Success {
2020 2021
			return errors.New(status.Reason)
		}
Z
zhenshan.cao 已提交
2022
	}
2023
	ft.result = &commonpb.Status{
2024
		ErrorCode: commonpb.ErrorCode_Success,
Z
zhenshan.cao 已提交
2025
	}
2026
	return nil
Z
zhenshan.cao 已提交
2027 2028
}

S
sunby 已提交
2029
func (ft *FlushTask) PostExecute(ctx context.Context) error {
Z
zhenshan.cao 已提交
2030 2031
	return nil
}
2032 2033 2034 2035

type LoadCollectionTask struct {
	Condition
	*milvuspb.LoadCollectionRequest
T
ThreadDao 已提交
2036 2037 2038
	ctx          context.Context
	queryService types.QueryService
	result       *commonpb.Status
2039 2040
}

2041
func (lct *LoadCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
2042
	return lct.ctx
2043 2044 2045 2046 2047 2048 2049 2050 2051 2052
}

func (lct *LoadCollectionTask) ID() UniqueID {
	return lct.Base.MsgID
}

func (lct *LoadCollectionTask) SetID(uid UniqueID) {
	lct.Base.MsgID = uid
}

S
sunby 已提交
2053 2054 2055 2056
func (lct *LoadCollectionTask) Name() string {
	return LoadCollectionTaskName
}

2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
func (lct *LoadCollectionTask) Type() commonpb.MsgType {
	return lct.Base.MsgType
}

func (lct *LoadCollectionTask) BeginTs() Timestamp {
	return lct.Base.Timestamp
}

func (lct *LoadCollectionTask) EndTs() Timestamp {
	return lct.Base.Timestamp
}

func (lct *LoadCollectionTask) SetTs(ts Timestamp) {
	lct.Base.Timestamp = ts
}

S
sunby 已提交
2073 2074 2075 2076 2077 2078
func (lct *LoadCollectionTask) OnEnqueue() error {
	lct.Base = &commonpb.MsgBase{}
	return nil
}

func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error {
S
sunby 已提交
2079
	log.Debug("LoadCollectionTask PreExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
2080
	lct.Base.MsgType = commonpb.MsgType_LoadCollection
2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091
	lct.Base.SourceID = Params.ProxyID

	collName := lct.CollectionName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
2092
func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) {
S
sunby 已提交
2093
	log.Debug("LoadCollectionTask Execute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
G
godchen 已提交
2094
	collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName)
2095 2096 2097
	if err != nil {
		return err
	}
G
godchen 已提交
2098
	collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lct.CollectionName)
2099 2100 2101 2102
	if err != nil {
		return err
	}

2103 2104
	request := &querypb.LoadCollectionRequest{
		Base: &commonpb.MsgBase{
2105
			MsgType:   commonpb.MsgType_LoadCollection,
2106 2107 2108 2109 2110 2111
			MsgID:     lct.Base.MsgID,
			Timestamp: lct.Base.Timestamp,
			SourceID:  lct.Base.SourceID,
		},
		DbID:         0,
		CollectionID: collID,
2112
		Schema:       collSchema,
2113
	}
S
sunby 已提交
2114 2115
	log.Debug("send LoadCollectionRequest to query service", zap.String("role", Params.RoleName), zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID),
		zap.Any("schema", request.Schema))
T
ThreadDao 已提交
2116
	lct.result, err = lct.queryService.LoadCollection(ctx, request)
S
sunby 已提交
2117 2118 2119 2120
	if err != nil {
		return fmt.Errorf("call query service LoadCollection: %s", err)
	}
	return nil
2121 2122
}

S
sunby 已提交
2123
func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error {
S
sunby 已提交
2124
	log.Debug("LoadCollectionTask PostExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
2125 2126 2127 2128 2129 2130
	return nil
}

type ReleaseCollectionTask struct {
	Condition
	*milvuspb.ReleaseCollectionRequest
T
ThreadDao 已提交
2131 2132 2133
	ctx          context.Context
	queryService types.QueryService
	result       *commonpb.Status
2134 2135
}

2136
func (rct *ReleaseCollectionTask) TraceCtx() context.Context {
S
sunby 已提交
2137
	return rct.ctx
2138 2139 2140 2141 2142 2143 2144 2145 2146 2147
}

func (rct *ReleaseCollectionTask) ID() UniqueID {
	return rct.Base.MsgID
}

func (rct *ReleaseCollectionTask) SetID(uid UniqueID) {
	rct.Base.MsgID = uid
}

S
sunby 已提交
2148 2149 2150 2151
func (rct *ReleaseCollectionTask) Name() string {
	return ReleaseCollectionTaskName
}

2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167
func (rct *ReleaseCollectionTask) Type() commonpb.MsgType {
	return rct.Base.MsgType
}

func (rct *ReleaseCollectionTask) BeginTs() Timestamp {
	return rct.Base.Timestamp
}

func (rct *ReleaseCollectionTask) EndTs() Timestamp {
	return rct.Base.Timestamp
}

func (rct *ReleaseCollectionTask) SetTs(ts Timestamp) {
	rct.Base.Timestamp = ts
}

S
sunby 已提交
2168 2169 2170 2171 2172 2173
func (rct *ReleaseCollectionTask) OnEnqueue() error {
	rct.Base = &commonpb.MsgBase{}
	return nil
}

func (rct *ReleaseCollectionTask) PreExecute(ctx context.Context) error {
2174
	rct.Base.MsgType = commonpb.MsgType_ReleaseCollection
2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185
	rct.Base.SourceID = Params.ProxyID

	collName := rct.CollectionName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
2186
func (rct *ReleaseCollectionTask) Execute(ctx context.Context) (err error) {
G
godchen 已提交
2187
	collID, err := globalMetaCache.GetCollectionID(ctx, rct.CollectionName)
2188 2189 2190 2191 2192
	if err != nil {
		return err
	}
	request := &querypb.ReleaseCollectionRequest{
		Base: &commonpb.MsgBase{
2193
			MsgType:   commonpb.MsgType_ReleaseCollection,
2194 2195 2196 2197 2198 2199 2200
			MsgID:     rct.Base.MsgID,
			Timestamp: rct.Base.Timestamp,
			SourceID:  rct.Base.SourceID,
		},
		DbID:         0,
		CollectionID: collID,
	}
T
ThreadDao 已提交
2201
	rct.result, err = rct.queryService.ReleaseCollection(ctx, request)
2202 2203 2204
	return err
}

S
sunby 已提交
2205
func (rct *ReleaseCollectionTask) PostExecute(ctx context.Context) error {
2206 2207 2208 2209 2210
	return nil
}

type LoadPartitionTask struct {
	Condition
G
godchen 已提交
2211
	*milvuspb.LoadPartitionsRequest
T
ThreadDao 已提交
2212 2213 2214
	ctx          context.Context
	queryService types.QueryService
	result       *commonpb.Status
2215 2216
}

2217 2218 2219 2220
func (lpt *LoadPartitionTask) TraceCtx() context.Context {
	return lpt.ctx
}

2221 2222 2223 2224 2225 2226 2227 2228
func (lpt *LoadPartitionTask) ID() UniqueID {
	return lpt.Base.MsgID
}

func (lpt *LoadPartitionTask) SetID(uid UniqueID) {
	lpt.Base.MsgID = uid
}

S
sunby 已提交
2229 2230 2231 2232
func (lpt *LoadPartitionTask) Name() string {
	return LoadPartitionTaskName
}

2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248
func (lpt *LoadPartitionTask) Type() commonpb.MsgType {
	return lpt.Base.MsgType
}

func (lpt *LoadPartitionTask) BeginTs() Timestamp {
	return lpt.Base.Timestamp
}

func (lpt *LoadPartitionTask) EndTs() Timestamp {
	return lpt.Base.Timestamp
}

func (lpt *LoadPartitionTask) SetTs(ts Timestamp) {
	lpt.Base.Timestamp = ts
}

S
sunby 已提交
2249 2250 2251 2252 2253 2254
func (lpt *LoadPartitionTask) OnEnqueue() error {
	lpt.Base = &commonpb.MsgBase{}
	return nil
}

func (lpt *LoadPartitionTask) PreExecute(ctx context.Context) error {
2255
	lpt.Base.MsgType = commonpb.MsgType_LoadPartitions
2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266
	lpt.Base.SourceID = Params.ProxyID

	collName := lpt.CollectionName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
2267
func (lpt *LoadPartitionTask) Execute(ctx context.Context) error {
2268
	var partitionIDs []int64
G
godchen 已提交
2269
	collID, err := globalMetaCache.GetCollectionID(ctx, lpt.CollectionName)
2270 2271 2272
	if err != nil {
		return err
	}
G
godchen 已提交
2273
	collSchema, err := globalMetaCache.GetCollectionSchema(ctx, lpt.CollectionName)
2274 2275 2276
	if err != nil {
		return err
	}
2277
	for _, partitionName := range lpt.PartitionNames {
G
godchen 已提交
2278
		partitionID, err := globalMetaCache.GetPartitionID(ctx, lpt.CollectionName, partitionName)
2279 2280 2281 2282 2283
		if err != nil {
			return err
		}
		partitionIDs = append(partitionIDs, partitionID)
	}
G
godchen 已提交
2284
	request := &querypb.LoadPartitionsRequest{
2285
		Base: &commonpb.MsgBase{
2286
			MsgType:   commonpb.MsgType_LoadPartitions,
2287 2288 2289 2290 2291 2292 2293
			MsgID:     lpt.Base.MsgID,
			Timestamp: lpt.Base.Timestamp,
			SourceID:  lpt.Base.SourceID,
		},
		DbID:         0,
		CollectionID: collID,
		PartitionIDs: partitionIDs,
2294
		Schema:       collSchema,
2295
	}
T
ThreadDao 已提交
2296
	lpt.result, err = lpt.queryService.LoadPartitions(ctx, request)
2297 2298 2299
	return err
}

S
sunby 已提交
2300
func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error {
2301 2302 2303 2304 2305
	return nil
}

type ReleasePartitionTask struct {
	Condition
G
godchen 已提交
2306
	*milvuspb.ReleasePartitionsRequest
T
ThreadDao 已提交
2307 2308 2309
	ctx          context.Context
	queryService types.QueryService
	result       *commonpb.Status
2310 2311
}

2312
func (rpt *ReleasePartitionTask) TraceCtx() context.Context {
S
sunby 已提交
2313
	return rpt.ctx
2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
}

func (rpt *ReleasePartitionTask) ID() UniqueID {
	return rpt.Base.MsgID
}

func (rpt *ReleasePartitionTask) SetID(uid UniqueID) {
	rpt.Base.MsgID = uid
}

func (rpt *ReleasePartitionTask) Type() commonpb.MsgType {
	return rpt.Base.MsgType
}

S
sunby 已提交
2328 2329 2330 2331
func (rpt *ReleasePartitionTask) Name() string {
	return ReleasePartitionTaskName
}

2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343
func (rpt *ReleasePartitionTask) BeginTs() Timestamp {
	return rpt.Base.Timestamp
}

func (rpt *ReleasePartitionTask) EndTs() Timestamp {
	return rpt.Base.Timestamp
}

func (rpt *ReleasePartitionTask) SetTs(ts Timestamp) {
	rpt.Base.Timestamp = ts
}

S
sunby 已提交
2344 2345 2346 2347 2348 2349
func (rpt *ReleasePartitionTask) OnEnqueue() error {
	rpt.Base = &commonpb.MsgBase{}
	return nil
}

func (rpt *ReleasePartitionTask) PreExecute(ctx context.Context) error {
2350
	rpt.Base.MsgType = commonpb.MsgType_ReleasePartitions
2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
	rpt.Base.SourceID = Params.ProxyID

	collName := rpt.CollectionName

	if err := ValidateCollectionName(collName); err != nil {
		return err
	}

	return nil
}

S
sunby 已提交
2362
func (rpt *ReleasePartitionTask) Execute(ctx context.Context) (err error) {
2363
	var partitionIDs []int64
G
godchen 已提交
2364
	collID, err := globalMetaCache.GetCollectionID(ctx, rpt.CollectionName)
2365 2366 2367 2368
	if err != nil {
		return err
	}
	for _, partitionName := range rpt.PartitionNames {
G
godchen 已提交
2369
		partitionID, err := globalMetaCache.GetPartitionID(ctx, rpt.CollectionName, partitionName)
2370 2371 2372 2373 2374
		if err != nil {
			return err
		}
		partitionIDs = append(partitionIDs, partitionID)
	}
G
godchen 已提交
2375
	request := &querypb.ReleasePartitionsRequest{
2376
		Base: &commonpb.MsgBase{
2377
			MsgType:   commonpb.MsgType_ReleasePartitions,
2378 2379 2380 2381 2382 2383 2384 2385
			MsgID:     rpt.Base.MsgID,
			Timestamp: rpt.Base.Timestamp,
			SourceID:  rpt.Base.SourceID,
		},
		DbID:         0,
		CollectionID: collID,
		PartitionIDs: partitionIDs,
	}
T
ThreadDao 已提交
2386
	rpt.result, err = rpt.queryService.ReleasePartitions(ctx, request)
2387 2388 2389
	return err
}

S
sunby 已提交
2390
func (rpt *ReleasePartitionTask) PostExecute(ctx context.Context) error {
2391 2392
	return nil
}