task.go 2.9 KB
Newer Older
C
cai.zhang 已提交
1 2 3 4
package indexservice

import (
	"context"
T
ThreadDao 已提交
5
	"errors"
6 7

	"go.uber.org/zap"
C
cai.zhang 已提交
8

9
	"github.com/zilliztech/milvus-distributed/internal/allocator"
T
ThreadDao 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/kv"
11
	"github.com/zilliztech/milvus-distributed/internal/log"
C
cai.zhang 已提交
12 13
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
14
	"github.com/zilliztech/milvus-distributed/internal/types"
C
cai.zhang 已提交
15 16
)

S
sunby 已提交
17 18 19 20
const (
	IndexAddTaskName = "IndexAddTask"
)

C
cai.zhang 已提交
21
type task interface {
S
sunby 已提交
22
	Ctx() context.Context
C
cai.zhang 已提交
23 24
	ID() UniqueID       // return ReqID
	SetID(uid UniqueID) // set ReqID
S
sunby 已提交
25 26 27 28
	Name() string
	PreExecute(ctx context.Context) error
	Execute(ctx context.Context) error
	PostExecute(ctx context.Context) error
C
cai.zhang 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
	WaitToFinish() error
	Notify(err error)
	OnEnqueue() error
}

type BaseTask struct {
	done  chan error
	ctx   context.Context
	id    UniqueID
	table *metaTable
}

func (bt *BaseTask) ID() UniqueID {
	return bt.id
}

func (bt *BaseTask) setID(id UniqueID) {
	bt.id = id
}

func (bt *BaseTask) WaitToFinish() error {
	select {
	case <-bt.ctx.Done():
		return errors.New("Task wait to finished timeout")
	case err := <-bt.done:
		return err
	}
}

func (bt *BaseTask) Notify(err error) {
	bt.done <- err
}

type IndexAddTask struct {
	BaseTask
	req               *indexpb.BuildIndexRequest
65
	indexBuildID      UniqueID
66
	idAllocator       *allocator.GlobalIDAllocator
C
cai.zhang 已提交
67
	buildQueue        TaskQueue
G
godchen 已提交
68
	kv                kv.BaseKV
T
ThreadDao 已提交
69
	builderClient     types.IndexNode
C
cai.zhang 已提交
70 71 72 73
	nodeClients       *PriorityQueue
	buildClientNodeID UniqueID
}

S
sunby 已提交
74 75 76 77 78 79 80 81
func (it *IndexAddTask) Ctx() context.Context {
	return it.ctx
}

func (it *IndexAddTask) ID() UniqueID {
	return it.id
}

C
cai.zhang 已提交
82 83 84 85
func (it *IndexAddTask) SetID(ID UniqueID) {
	it.BaseTask.setID(ID)
}

S
sunby 已提交
86 87 88 89
func (it *IndexAddTask) Name() string {
	return IndexAddTaskName
}

C
cai.zhang 已提交
90 91
func (it *IndexAddTask) OnEnqueue() error {
	var err error
92
	it.indexBuildID, err = it.idAllocator.AllocOne()
C
cai.zhang 已提交
93 94 95 96 97 98
	if err != nil {
		return err
	}
	return nil
}

S
sunby 已提交
99
func (it *IndexAddTask) PreExecute(ctx context.Context) error {
100
	log.Debug("pretend to check Index Req")
C
cai.zhang 已提交
101 102 103 104 105 106
	nodeID, builderClient := it.nodeClients.PeekClient()
	if builderClient == nil {
		return errors.New("IndexAddTask Service not available")
	}
	it.builderClient = builderClient
	it.buildClientNodeID = nodeID
107
	err := it.table.AddIndex(it.indexBuildID, it.req)
C
cai.zhang 已提交
108 109 110 111 112 113
	if err != nil {
		return err
	}
	return nil
}

S
sunby 已提交
114
func (it *IndexAddTask) Execute(ctx context.Context) error {
G
godchen 已提交
115
	it.req.IndexBuildID = it.indexBuildID
116
	log.Debug("before index ...")
G
godchen 已提交
117
	resp, err := it.builderClient.BuildIndex(ctx, it.req)
C
cai.zhang 已提交
118
	if err != nil {
119
		log.Debug("indexservice", zap.String("build index finish err", err.Error()))
C
cai.zhang 已提交
120 121
		return err
	}
122
	if resp.ErrorCode != commonpb.ErrorCode_Success {
C
cai.zhang 已提交
123 124 125 126 127 128
		return errors.New(resp.Reason)
	}
	it.nodeClients.IncPriority(it.buildClientNodeID, 1)
	return nil
}

S
sunby 已提交
129
func (it *IndexAddTask) PostExecute(ctx context.Context) error {
C
cai.zhang 已提交
130 131
	return nil
}