task.go 22.4 KB
Newer Older
1 2 3
package masterservice

import (
G
groot 已提交
4
	"context"
5
	"fmt"
6

7
	"github.com/golang/protobuf/proto"
N
neza2017 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/log"
9 10
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
G
godchen 已提交
11
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
12 13 14
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
15
	"go.uber.org/zap"
16 17 18
)

type reqTask interface {
G
groot 已提交
19
	Ctx() context.Context
20 21
	Type() commonpb.MsgType
	Ts() (typeutil.Timestamp, error)
22
	IgnoreTimeStamp() bool
G
groot 已提交
23
	Execute(ctx context.Context) error
24 25 26 27 28
	WaitToFinish() error
	Notify(err error)
}

type baseReqTask struct {
G
groot 已提交
29
	ctx  context.Context
30 31 32 33 34 35 36 37 38 39 40
	cv   chan error
	core *Core
}

func (bt *baseReqTask) Notify(err error) {
	bt.cv <- err
}

func (bt *baseReqTask) WaitToFinish() error {
	select {
	case <-bt.core.ctx.Done():
N
neza2017 已提交
41
		return fmt.Errorf("core context done, %s", bt.core.ctx.Err().Error())
B
bigsheeper 已提交
42
	case <-bt.ctx.Done():
N
neza2017 已提交
43
		return fmt.Errorf("request context done, %s", bt.ctx.Err().Error())
44 45
	case err, ok := <-bt.cv:
		if !ok {
N
neza2017 已提交
46
			return fmt.Errorf("notify chan closed")
47 48 49 50 51 52 53 54 55 56
		}
		return err
	}
}

type CreateCollectionReqTask struct {
	baseReqTask
	Req *milvuspb.CreateCollectionRequest
}

G
groot 已提交
57 58 59 60
func (t *CreateCollectionReqTask) Ctx() context.Context {
	return t.ctx
}

61 62 63
func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}
64

65 66 67 68
func (t *CreateCollectionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

69 70 71 72
func (t *CreateCollectionReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
73
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
74 75 76
	if t.Type() != commonpb.MsgType_CreateCollection {
		return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
77 78 79 80 81 82
	var schema schemapb.CollectionSchema
	err := proto.Unmarshal(t.Req.Schema, &schema)
	if err != nil {
		return err
	}

83
	if t.Req.CollectionName != schema.Name {
S
sunby 已提交
84
		return fmt.Errorf("collection name = %s, schema.Name=%s", t.Req.CollectionName, schema.Name)
85 86
	}

87 88 89 90 91 92 93 94
	for idx, field := range schema.Fields {
		field.FieldID = int64(idx + StartOfUserFieldID)
	}
	rowIDField := &schemapb.FieldSchema{
		FieldID:      int64(RowIDField),
		Name:         RowIDFieldName,
		IsPrimaryKey: false,
		Description:  "row id",
G
godchen 已提交
95
		DataType:     schemapb.DataType_Int64,
96 97 98 99 100 101
	}
	timeStampField := &schemapb.FieldSchema{
		FieldID:      int64(TimeStampField),
		Name:         TimeStampFieldName,
		IsPrimaryKey: false,
		Description:  "time stamp",
G
godchen 已提交
102
		DataType:     schemapb.DataType_Int64,
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
	}
	schema.Fields = append(schema.Fields, rowIDField, timeStampField)

	collID, err := t.core.idAllocator.AllocOne()
	if err != nil {
		return err
	}
	collTs, err := t.Ts()
	if err != nil {
		return err
	}
	partitionID, err := t.core.idAllocator.AllocOne()
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
118 119 120 121 122
	collMeta := etcdpb.CollectionInfo{
		ID:           collID,
		Schema:       &schema,
		CreateTime:   collTs,
		PartitionIDs: make([]typeutil.UniqueID, 0, 16),
N
neza2017 已提交
123
		FieldIndexes: make([]*etcdpb.FieldIndexInfo, 0, 16),
Z
zhenshan.cao 已提交
124 125 126 127 128
	}
	partMeta := etcdpb.PartitionInfo{
		PartitionName: Params.DefaultPartitionName,
		PartitionID:   partitionID,
		SegmentIDs:    make([]typeutil.UniqueID, 0, 16),
129
	}
N
neza2017 已提交
130
	idxInfo := make([]*etcdpb.IndexInfo, 0, 16)
131 132
	/////////////////////// ignore index param from create_collection /////////////////////////
	//for _, field := range schema.Fields {
G
godchen 已提交
133
	//	if field.DataType == schemapb.DataType_VectorFloat || field.DataType == schemapb.DataType_VectorBinary {
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
	//		if len(field.IndexParams) > 0 {
	//			idxID, err := t.core.idAllocator.AllocOne()
	//			if err != nil {
	//				return err
	//			}
	//			filedIdx := &etcdpb.FieldIndexInfo{
	//				FiledID: field.FieldID,
	//				IndexID: idxID,
	//			}
	//			idx := &etcdpb.IndexInfo{
	//				IndexName:   fmt.Sprintf("%s_index_%d", collMeta.Schema.Name, field.FieldID),
	//				IndexID:     idxID,
	//				IndexParams: field.IndexParams,
	//			}
	//			idxInfo = append(idxInfo, idx)
	//			collMeta.FieldIndexes = append(collMeta.FieldIndexes, filedIdx)
	//		}
	//	}
	//}
Z
zhenshan.cao 已提交
153

N
neza2017 已提交
154
	err = t.core.MetaTable.AddCollection(&collMeta, &partMeta, idxInfo)
155 156 157
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
158 159 160 161 162
	schemaBytes, err := proto.Marshal(&schema)
	if err != nil {
		return err
	}

G
godchen 已提交
163
	ddReq := internalpb.CreateCollectionRequest{
Z
zhenshan.cao 已提交
164 165 166 167 168 169 170 171
		Base:           t.Req.Base,
		DbName:         t.Req.DbName,
		CollectionName: t.Req.CollectionName,
		DbID:           0, //TODO,not used
		CollectionID:   collID,
		Schema:         schemaBytes,
	}

G
groot 已提交
172
	err = t.core.DdCreateCollectionReq(ctx, &ddReq)
173 174 175 176
	if err != nil {
		return err
	}

G
godchen 已提交
177
	ddPart := internalpb.CreatePartitionRequest{
G
godchen 已提交
178
		Base: &commonpb.MsgBase{
179
			MsgType:   commonpb.MsgType_CreatePartition,
G
godchen 已提交
180 181 182 183 184 185 186 187 188 189 190 191
			MsgID:     t.Req.Base.MsgID, //TODO, msg id
			Timestamp: t.Req.Base.Timestamp + 1,
			SourceID:  t.Req.Base.SourceID,
		},
		DbName:         t.Req.DbName,
		CollectionName: t.Req.CollectionName,
		PartitionName:  Params.DefaultPartitionName,
		DbID:           0, //TODO, not used
		CollectionID:   collMeta.ID,
		PartitionID:    partMeta.PartitionID,
	}

G
groot 已提交
192
	err = t.core.DdCreatePartitionReq(ctx, &ddPart)
G
godchen 已提交
193 194 195 196
	if err != nil {
		return err
	}

197 198 199 200 201 202 203 204
	return nil
}

type DropCollectionReqTask struct {
	baseReqTask
	Req *milvuspb.DropCollectionRequest
}

G
groot 已提交
205 206 207 208
func (t *DropCollectionReqTask) Ctx() context.Context {
	return t.ctx
}

209 210 211 212 213 214 215 216
func (t *DropCollectionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DropCollectionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

217 218 219 220
func (t *DropCollectionReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
221
func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
222 223 224 225
	if t.Type() != commonpb.MsgType_DropCollection {
		return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}

226 227 228 229
	collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	if err != nil {
		return err
	}
G
groot 已提交
230
	if err = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName); err != nil {
231 232
		return err
	}
233

234 235 236 237 238 239 240
	err = t.core.MetaTable.DeleteCollection(collMeta.ID)
	if err != nil {
		return err
	}

	//data service should drop segments , which belong to this collection, from the segment manager

G
godchen 已提交
241
	ddReq := internalpb.DropCollectionRequest{
Z
zhenshan.cao 已提交
242 243 244 245 246 247 248
		Base:           t.Req.Base,
		DbName:         t.Req.DbName,
		CollectionName: t.Req.CollectionName,
		DbID:           0, //not used
		CollectionID:   collMeta.ID,
	}

G
groot 已提交
249
	err = t.core.DdDropCollectionReq(ctx, &ddReq)
250 251 252
	if err != nil {
		return err
	}
253 254 255

	//notify query service to release collection
	go func() {
256
		if err = t.core.ReleaseCollection(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil {
N
neza2017 已提交
257
			log.Warn("ReleaseCollection failed", zap.String("error", err.Error()))
258 259 260
		}
	}()

261 262 263 264 265 266 267 268 269
	return nil
}

type HasCollectionReqTask struct {
	baseReqTask
	Req           *milvuspb.HasCollectionRequest
	HasCollection bool
}

G
groot 已提交
270 271 272 273
func (t *HasCollectionReqTask) Ctx() context.Context {
	return t.ctx
}

274 275 276 277 278 279 280 281
func (t *HasCollectionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *HasCollectionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

282 283 284 285
func (t *HasCollectionReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
286 287 288 289
func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_HasCollection {
		return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
	_, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	if err == nil {
		t.HasCollection = true
	} else {
		t.HasCollection = false
	}
	return nil
}

type DescribeCollectionReqTask struct {
	baseReqTask
	Req *milvuspb.DescribeCollectionRequest
	Rsp *milvuspb.DescribeCollectionResponse
}

G
groot 已提交
305 306 307 308
func (t *DescribeCollectionReqTask) Ctx() context.Context {
	return t.ctx
}

309 310 311 312 313 314 315 316
func (t *DescribeCollectionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DescribeCollectionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

317 318 319 320
func (t *DescribeCollectionReqTask) IgnoreTimeStamp() bool {
	return true
}

G
groot 已提交
321
func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
322 323 324
	if t.Type() != commonpb.MsgType_DescribeCollection {
		return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337
	var coll *etcdpb.CollectionInfo
	var err error

	if t.Req.CollectionName != "" {
		coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
		if err != nil {
			return err
		}
	} else {
		coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
		if err != nil {
			return err
		}
338
	}
N
neza2017 已提交
339

340
	t.Rsp.Schema = proto.Clone(coll.Schema).(*schemapb.CollectionSchema)
341
	t.Rsp.CollectionID = coll.ID
342 343 344 345 346 347 348 349 350 351 352 353
	var newField []*schemapb.FieldSchema
	for _, field := range t.Rsp.Schema.Fields {
		if field.FieldID >= StartOfUserFieldID {
			newField = append(newField, field)
		}
	}
	t.Rsp.Schema.Fields = newField
	return nil
}

type ShowCollectionReqTask struct {
	baseReqTask
G
godchen 已提交
354 355
	Req *milvuspb.ShowCollectionsRequest
	Rsp *milvuspb.ShowCollectionsResponse
356 357
}

G
groot 已提交
358 359 360 361
func (t *ShowCollectionReqTask) Ctx() context.Context {
	return t.ctx
}

362 363 364 365 366 367 368 369
func (t *ShowCollectionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *ShowCollectionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

370 371 372 373
func (t *ShowCollectionReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
374 375 376 377
func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_ShowCollections {
		return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
378 379 380 381 382 383 384 385 386 387 388 389 390
	coll, err := t.core.MetaTable.ListCollections()
	if err != nil {
		return err
	}
	t.Rsp.CollectionNames = coll
	return nil
}

type CreatePartitionReqTask struct {
	baseReqTask
	Req *milvuspb.CreatePartitionRequest
}

G
groot 已提交
391 392 393 394
func (t *CreatePartitionReqTask) Ctx() context.Context {
	return t.ctx
}

395 396 397 398 399 400 401 402
func (t *CreatePartitionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *CreatePartitionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

403 404 405 406
func (t *CreatePartitionReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
407
func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
408 409 410
	if t.Type() != commonpb.MsgType_CreatePartition {
		return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
411 412 413 414 415 416 417 418 419 420 421 422 423
	collMeta, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	if err != nil {
		return err
	}
	partitionID, err := t.core.idAllocator.AllocOne()
	if err != nil {
		return err
	}
	err = t.core.MetaTable.AddPartition(collMeta.ID, t.Req.PartitionName, partitionID)
	if err != nil {
		return err
	}

G
godchen 已提交
424
	ddReq := internalpb.CreatePartitionRequest{
Z
zhenshan.cao 已提交
425 426 427 428 429 430 431 432 433
		Base:           t.Req.Base,
		DbName:         t.Req.DbName,
		CollectionName: t.Req.CollectionName,
		PartitionName:  t.Req.PartitionName,
		DbID:           0, // todo, not used
		CollectionID:   collMeta.ID,
		PartitionID:    partitionID,
	}

G
groot 已提交
434
	err = t.core.DdCreatePartitionReq(ctx, &ddReq)
435 436 437 438
	if err != nil {
		return err
	}

Z
zhenshan.cao 已提交
439 440 441
	// error doesn't matter here
	_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)

442 443 444 445 446 447 448 449
	return nil
}

type DropPartitionReqTask struct {
	baseReqTask
	Req *milvuspb.DropPartitionRequest
}

G
groot 已提交
450 451 452 453
func (t *DropPartitionReqTask) Ctx() context.Context {
	return t.ctx
}

454 455 456 457 458 459 460 461
func (t *DropPartitionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DropPartitionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

462 463 464 465
func (t *DropPartitionReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
466
func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
467 468 469
	if t.Type() != commonpb.MsgType_DropPartition {
		return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
470 471 472 473
	coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
474
	partID, err := t.core.MetaTable.DeletePartition(coll.ID, t.Req.PartitionName)
475 476 477 478
	if err != nil {
		return err
	}

G
godchen 已提交
479
	ddReq := internalpb.DropPartitionRequest{
Z
zhenshan.cao 已提交
480 481 482 483 484 485 486 487 488
		Base:           t.Req.Base,
		DbName:         t.Req.DbName,
		CollectionName: t.Req.CollectionName,
		PartitionName:  t.Req.PartitionName,
		DbID:           0, //todo,not used
		CollectionID:   coll.ID,
		PartitionID:    partID,
	}

G
groot 已提交
489
	err = t.core.DdDropPartitionReq(ctx, &ddReq)
490 491 492
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
493 494 495

	// error doesn't matter here
	_ = t.core.InvalidateCollectionMetaCache(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName)
496 497 498 499 500 501 502 503 504
	return nil
}

type HasPartitionReqTask struct {
	baseReqTask
	Req          *milvuspb.HasPartitionRequest
	HasPartition bool
}

G
groot 已提交
505 506 507 508
func (t *HasPartitionReqTask) Ctx() context.Context {
	return t.ctx
}

509 510 511 512 513 514 515 516
func (t *HasPartitionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *HasPartitionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

517 518 519 520
func (t *HasPartitionReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
521 522 523 524
func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_HasPartition {
		return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
525 526 527 528 529 530 531 532 533 534
	coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	if err != nil {
		return err
	}
	t.HasPartition = t.core.MetaTable.HasPartition(coll.ID, t.Req.PartitionName)
	return nil
}

type ShowPartitionReqTask struct {
	baseReqTask
G
godchen 已提交
535 536
	Req *milvuspb.ShowPartitionsRequest
	Rsp *milvuspb.ShowPartitionsResponse
537 538
}

G
groot 已提交
539 540 541 542
func (t *ShowPartitionReqTask) Ctx() context.Context {
	return t.ctx
}

543 544 545 546 547 548 549 550
func (t *ShowPartitionReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *ShowPartitionReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

551 552 553 554
func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
555 556 557 558
func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_ShowPartitions {
		return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
G
godchen 已提交
559 560 561 562 563 564 565
	var coll *etcdpb.CollectionInfo
	var err error
	if t.Req.CollectionName == "" {
		coll, err = t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
	} else {
		coll, err = t.core.MetaTable.GetCollectionByName(t.Req.CollectionName)
	}
566 567 568
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
569 570 571 572 573 574 575 576
	for _, partID := range coll.PartitionIDs {
		partMeta, err := t.core.MetaTable.GetPartitionByID(partID)
		if err != nil {
			return err
		}
		t.Rsp.PartitionIDs = append(t.Rsp.PartitionIDs, partMeta.PartitionID)
		t.Rsp.PartitionNames = append(t.Rsp.PartitionNames, partMeta.PartitionName)
	}
577 578
	return nil
}
N
neza2017 已提交
579 580 581 582 583 584 585

type DescribeSegmentReqTask struct {
	baseReqTask
	Req *milvuspb.DescribeSegmentRequest
	Rsp *milvuspb.DescribeSegmentResponse //TODO,return repeated segment id in the future
}

G
groot 已提交
586 587 588 589
func (t *DescribeSegmentReqTask) Ctx() context.Context {
	return t.ctx
}

N
neza2017 已提交
590 591 592 593 594 595 596 597
func (t *DescribeSegmentReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DescribeSegmentReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

598 599 600 601
func (t *DescribeSegmentReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
602 603 604 605
func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_DescribeSegment {
		return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
	coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
	if err != nil {
		return err
	}
	exist := false
	for _, partID := range coll.PartitionIDs {
		if exist {
			break
		}
		partMeta, err := t.core.MetaTable.GetPartitionByID(partID)
		if err != nil {
			return err
		}
		for _, e := range partMeta.SegmentIDs {
			if e == t.Req.SegmentID {
				exist = true
				break
			}
		}
	}
	if !exist {
S
sunby 已提交
627
		return fmt.Errorf("segment id %d not belong to collection id %d", t.Req.SegmentID, t.Req.CollectionID)
N
neza2017 已提交
628 629 630 631 632 633 634
	}
	//TODO, get filed_id and index_name from request
	segIdxInfo, err := t.core.MetaTable.GetSegmentIndexInfoByID(t.Req.SegmentID, -1, "")
	if err != nil {
		return err
	}
	t.Rsp.IndexID = segIdxInfo.IndexID
N
neza2017 已提交
635
	t.Rsp.BuildID = segIdxInfo.BuildID
636
	t.Rsp.EnableIndex = segIdxInfo.EnableIndex
N
neza2017 已提交
637 638 639 640 641
	return nil
}

type ShowSegmentReqTask struct {
	baseReqTask
G
godchen 已提交
642 643
	Req *milvuspb.ShowSegmentsRequest
	Rsp *milvuspb.ShowSegmentsResponse
N
neza2017 已提交
644 645
}

G
groot 已提交
646 647 648 649
func (t *ShowSegmentReqTask) Ctx() context.Context {
	return t.ctx
}

N
neza2017 已提交
650 651 652 653 654 655 656 657
func (t *ShowSegmentReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *ShowSegmentReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

658 659 660 661
func (t *ShowSegmentReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
662 663 664 665
func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_ShowSegments {
		return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
666 667 668 669
	coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID)
	if err != nil {
		return err
	}
N
neza2017 已提交
670
	exist := false
N
neza2017 已提交
671
	for _, partID := range coll.PartitionIDs {
N
neza2017 已提交
672 673 674
		if partID == t.Req.PartitionID {
			exist = true
			break
N
neza2017 已提交
675 676
		}
	}
N
neza2017 已提交
677
	if !exist {
S
sunby 已提交
678
		return fmt.Errorf("partition id = %d not belong to collection id = %d", t.Req.PartitionID, t.Req.CollectionID)
N
neza2017 已提交
679 680 681 682 683 684
	}
	partMeta, err := t.core.MetaTable.GetPartitionByID(t.Req.PartitionID)
	if err != nil {
		return err
	}
	t.Rsp.SegmentIDs = append(t.Rsp.SegmentIDs, partMeta.SegmentIDs...)
N
neza2017 已提交
685 686 687 688 689 690 691 692
	return nil
}

type CreateIndexReqTask struct {
	baseReqTask
	Req *milvuspb.CreateIndexRequest
}

G
groot 已提交
693 694 695 696
func (t *CreateIndexReqTask) Ctx() context.Context {
	return t.ctx
}

N
neza2017 已提交
697 698 699 700 701 702 703 704
func (t *CreateIndexReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *CreateIndexReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

705 706 707 708
func (t *CreateIndexReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
709
func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
710 711 712
	if t.Type() != commonpb.MsgType_CreateIndex {
		return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
713
	indexName := Params.DefaultIndexName //TODO, get name from request
N
neza2017 已提交
714 715 716 717 718 719 720 721 722 723
	indexID, err := t.core.idAllocator.AllocOne()
	if err != nil {
		return err
	}
	idxInfo := &etcdpb.IndexInfo{
		IndexName:   indexName,
		IndexID:     indexID,
		IndexParams: t.Req.ExtraParams,
	}
	segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo)
N
neza2017 已提交
724 725 726
	if err != nil {
		return err
	}
G
godchen 已提交
727
	if field.DataType != schemapb.DataType_FloatVector && field.DataType != schemapb.DataType_BinaryVector {
S
sunby 已提交
728
		return fmt.Errorf("field name = %s, data type = %s", t.Req.FieldName, schemapb.DataType_name[int32(field.DataType)])
729
	}
N
neza2017 已提交
730 731
	for _, seg := range segIDs {
		task := CreateIndexTask{
N
neza2017 已提交
732 733 734 735 736 737 738 739
			ctx:               t.core.ctx,
			core:              t.core,
			segmentID:         seg,
			indexName:         idxInfo.IndexName,
			indexID:           idxInfo.IndexID,
			fieldSchema:       &field,
			indexParams:       t.Req.ExtraParams,
			isFromFlushedChan: false,
N
neza2017 已提交
740 741
		}
		t.core.indexTaskQueue <- &task
742
		fmt.Println("create index task enqueue, segID = ", seg)
N
neza2017 已提交
743 744 745 746 747 748 749 750 751 752
	}
	return nil
}

type DescribeIndexReqTask struct {
	baseReqTask
	Req *milvuspb.DescribeIndexRequest
	Rsp *milvuspb.DescribeIndexResponse
}

G
groot 已提交
753 754 755 756
func (t *DescribeIndexReqTask) Ctx() context.Context {
	return t.ctx
}

N
neza2017 已提交
757 758 759 760 761 762 763 764
func (t *DescribeIndexReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DescribeIndexReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

765 766 767 768
func (t *DescribeIndexReqTask) IgnoreTimeStamp() bool {
	return true
}

N
neza2017 已提交
769 770 771 772
func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
	if t.Type() != commonpb.MsgType_DescribeIndex {
		return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
773 774 775 776 777 778 779 780
	idx, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
	if err != nil {
		return err
	}
	for _, i := range idx {
		desc := &milvuspb.IndexDescription{
			IndexName: i.IndexName,
			Params:    i.IndexParams,
781
			IndexID:   i.IndexID,
N
neza2017 已提交
782 783 784 785 786 787
		}
		t.Rsp.IndexDescriptions = append(t.Rsp.IndexDescriptions, desc)
	}
	return nil
}

N
neza2017 已提交
788 789 790 791 792
type DropIndexReqTask struct {
	baseReqTask
	Req *milvuspb.DropIndexRequest
}

G
groot 已提交
793 794 795 796
func (t *DropIndexReqTask) Ctx() context.Context {
	return t.ctx
}

N
neza2017 已提交
797 798 799 800 801 802 803 804 805 806 807 808
func (t *DropIndexReqTask) Type() commonpb.MsgType {
	return t.Req.Base.MsgType
}

func (t *DropIndexReqTask) Ts() (typeutil.Timestamp, error) {
	return t.Req.Base.Timestamp, nil
}

func (t *DropIndexReqTask) IgnoreTimeStamp() bool {
	return false
}

G
groot 已提交
809
func (t *DropIndexReqTask) Execute(ctx context.Context) error {
N
neza2017 已提交
810 811 812
	if t.Type() != commonpb.MsgType_DropIndex {
		return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
	}
N
neza2017 已提交
813
	info, err := t.core.MetaTable.GetIndexByName(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
N
neza2017 已提交
814
	if err != nil {
N
neza2017 已提交
815
		log.Warn("GetIndexByName failed,", zap.String("collection name", t.Req.CollectionName), zap.String("field name", t.Req.FieldName), zap.String("index name", t.Req.IndexName), zap.Error(err))
N
neza2017 已提交
816 817
		return err
	}
N
neza2017 已提交
818 819
	if len(info) == 0 {
		return nil
N
neza2017 已提交
820
	}
N
neza2017 已提交
821
	if len(info) != 1 {
S
sunby 已提交
822
		return fmt.Errorf("len(index) = %d", len(info))
N
neza2017 已提交
823
	}
G
groot 已提交
824
	err = t.core.DropIndexReq(ctx, info[0].IndexID)
N
neza2017 已提交
825 826 827 828 829
	if err != nil {
		return err
	}
	_, _, err = t.core.MetaTable.DropIndex(t.Req.CollectionName, t.Req.FieldName, t.Req.IndexName)
	return err
N
neza2017 已提交
830 831
}

N
neza2017 已提交
832
type CreateIndexTask struct {
N
neza2017 已提交
833 834 835 836 837 838 839 840
	ctx               context.Context
	core              *Core
	segmentID         typeutil.UniqueID
	indexName         string
	indexID           typeutil.UniqueID
	fieldSchema       *schemapb.FieldSchema
	indexParams       []*commonpb.KeyValuePair
	isFromFlushedChan bool
N
neza2017 已提交
841 842 843 844 845 846
}

func (t *CreateIndexTask) BuildIndex() error {
	if t.core.MetaTable.IsSegmentIndexed(t.segmentID, t.fieldSchema, t.indexParams) {
		return nil
	}
N
neza2017 已提交
847
	rows, err := t.core.GetNumRowsReq(t.segmentID, t.isFromFlushedChan)
N
neza2017 已提交
848 849 850
	if err != nil {
		return err
	}
851 852 853 854 855 856 857 858 859
	var bldID typeutil.UniqueID = 0
	enableIdx := false
	if rows < Params.MinSegmentSizeToEnableIndex {
		log.Debug("num of is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
	} else {
		binlogs, err := t.core.GetBinlogFilePathsFromDataServiceReq(t.segmentID, t.fieldSchema.FieldID)
		if err != nil {
			return err
		}
G
groot 已提交
860
		bldID, err = t.core.BuildIndexReq(t.ctx, binlogs, t.fieldSchema.TypeParams, t.indexParams, t.indexID, t.indexName)
861 862 863 864
		if err != nil {
			return err
		}
		enableIdx = true
N
neza2017 已提交
865 866
	}
	seg := etcdpb.SegmentIndexInfo{
867 868 869 870 871
		SegmentID:   t.segmentID,
		FieldID:     t.fieldSchema.FieldID,
		IndexID:     t.indexID,
		BuildID:     bldID,
		EnableIndex: enableIdx,
N
neza2017 已提交
872
	}
N
neza2017 已提交
873
	err = t.core.MetaTable.AddIndex(&seg)
N
neza2017 已提交
874 875
	return err
}