partition_task.go 5.2 KB
Newer Older
1 2 3 4 5
package master

import (
	"encoding/json"
	"errors"
Z
zhenshan.cao 已提交
6 7 8
	"log"
	"strconv"

9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)

const partitionMetaPrefix = "partition/"

type createPartitionTask struct {
	baseTask
	req *internalpb.CreatePartitionRequest
}

type dropPartitionTask struct {
	baseTask
	req *internalpb.DropPartitionRequest
}

type hasPartitionTask struct {
	baseTask
	hasPartition bool
	req          *internalpb.HasPartitionRequest
}

type describePartitionTask struct {
	baseTask
	description *servicepb.PartitionDescription
	req         *internalpb.DescribePartitionRequest
}

type showPartitionTask struct {
	baseTask
	stringListResponse *servicepb.StringListResponse
	req                *internalpb.ShowPartitionRequest
}

//////////////////////////////////////////////////////////////////////////
N
neza2017 已提交
45
func (t *createPartitionTask) Type() internalpb.MsgType {
46 47 48 49
	if t.req == nil {
		log.Printf("null request")
		return 0
	}
N
neza2017 已提交
50
	return t.req.MsgType
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
}

func (t *createPartitionTask) Ts() (Timestamp, error) {
	if t.req == nil {
		return 0, errors.New("null request")
	}
	return Timestamp(t.req.Timestamp), nil
}

func (t *createPartitionTask) Execute() error {
	if t.req == nil {
		return errors.New("null request")
	}

	partitionName := t.req.PartitionName
	collectionName := partitionName.CollectionName
	collectionMeta, err := t.mt.GetCollectionByName(collectionName)
	if err != nil {
		return err
	}

	collectionMeta.PartitionTags = append(collectionMeta.PartitionTags, partitionName.Tag)

C
cai.zhang 已提交
74
	collectionJSON, err := json.Marshal(&collectionMeta)
75
	if err != nil {
B
bigsheeper 已提交
76
		return err
77 78
	}

79
	collectionID := collectionMeta.Id
C
cai.zhang 已提交
80
	err = (*t.kvBase).Save(partitionMetaPrefix+strconv.FormatInt(collectionID, 10), string(collectionJSON))
81
	if err != nil {
B
bigsheeper 已提交
82
		return err
83 84 85 86 87 88
	}

	return nil
}

//////////////////////////////////////////////////////////////////////////
N
neza2017 已提交
89
func (t *dropPartitionTask) Type() internalpb.MsgType {
90 91 92 93
	if t.req == nil {
		log.Printf("null request")
		return 0
	}
N
neza2017 已提交
94
	return t.req.MsgType
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
}

func (t *dropPartitionTask) Ts() (Timestamp, error) {
	if t.req == nil {
		return 0, errors.New("null request")
	}
	return Timestamp(t.req.Timestamp), nil
}

func (t *dropPartitionTask) Execute() error {
	if t.req == nil {
		return errors.New("null request")
	}

	partitionName := t.req.PartitionName
	collectionName := partitionName.CollectionName
	collectionMeta, err := t.mt.GetCollectionByName(collectionName)
	if err != nil {
		return err
	}

116
	err = t.mt.DeletePartition(collectionMeta.Id, partitionName.Tag)
117 118 119 120
	if err != nil {
		return err
	}

C
cai.zhang 已提交
121
	collectionJSON, err := json.Marshal(&collectionMeta)
122
	if err != nil {
B
bigsheeper 已提交
123
		return err
124 125
	}

126
	collectionID := collectionMeta.Id
C
cai.zhang 已提交
127
	err = (*t.kvBase).Save(partitionMetaPrefix+strconv.FormatInt(collectionID, 10), string(collectionJSON))
128
	if err != nil {
B
bigsheeper 已提交
129
		return err
130 131 132 133 134 135
	}

	return nil
}

//////////////////////////////////////////////////////////////////////////
N
neza2017 已提交
136
func (t *hasPartitionTask) Type() internalpb.MsgType {
137 138 139 140
	if t.req == nil {
		log.Printf("null request")
		return 0
	}
N
neza2017 已提交
141
	return t.req.MsgType
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
}

func (t *hasPartitionTask) Ts() (Timestamp, error) {
	if t.req == nil {
		return 0, errors.New("null request")
	}
	return Timestamp(t.req.Timestamp), nil
}

func (t *hasPartitionTask) Execute() error {
	if t.req == nil {
		return errors.New("null request")
	}

	partitionName := t.req.PartitionName
	collectionName := partitionName.CollectionName
G
godchen 已提交
158 159 160 161 162
	collectionMeta, err := t.mt.GetCollectionByName(collectionName)
	if err != nil {
		return err
	}

163
	t.hasPartition = t.mt.HasPartition(collectionMeta.Id, partitionName.Tag)
164 165 166 167 168

	return nil
}

//////////////////////////////////////////////////////////////////////////
N
neza2017 已提交
169
func (t *describePartitionTask) Type() internalpb.MsgType {
170 171 172 173
	if t.req == nil {
		log.Printf("null request")
		return 0
	}
N
neza2017 已提交
174
	return t.req.MsgType
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
}

func (t *describePartitionTask) Ts() (Timestamp, error) {
	if t.req == nil {
		return 0, errors.New("null request")
	}
	return Timestamp(t.req.Timestamp), nil
}

func (t *describePartitionTask) Execute() error {
	if t.req == nil {
		return errors.New("null request")
	}

	partitionName := t.req.PartitionName

Z
zhenshan.cao 已提交
191
	description := servicepb.PartitionDescription{
192 193 194 195 196 197 198 199 200 201 202 203
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Name: partitionName,
	}

	t.description = &description

	return nil
}

//////////////////////////////////////////////////////////////////////////
N
neza2017 已提交
204
func (t *showPartitionTask) Type() internalpb.MsgType {
205 206 207 208
	if t.req == nil {
		log.Printf("null request")
		return 0
	}
N
neza2017 已提交
209
	return t.req.MsgType
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
}

func (t *showPartitionTask) Ts() (Timestamp, error) {
	if t.req == nil {
		return 0, errors.New("null request")
	}
	return Timestamp(t.req.Timestamp), nil
}

func (t *showPartitionTask) Execute() error {
	if t.req == nil {
		return errors.New("null request")
	}

	partitions := make([]string, 0)
C
cai.zhang 已提交
225 226
	for _, collection := range t.mt.collID2Meta {
		partitions = append(partitions, collection.PartitionTags...)
227 228
	}

Z
zhenshan.cao 已提交
229
	stringListResponse := servicepb.StringListResponse{
230 231 232 233 234 235 236 237 238 239
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Values: partitions,
	}

	t.stringListResponse = &stringListResponse

	return nil
}