meta_table.go 10.9 KB
Newer Older
1 2 3
package master

import (
Z
zhenshan.cao 已提交
4 5 6
	"strconv"
	"sync"

Z
zhenshan.cao 已提交
7 8
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"

N
neza2017 已提交
9
	"github.com/golang/protobuf/proto"
10
	"github.com/zilliztech/milvus-distributed/internal/errors"
Z
zhenshan.cao 已提交
11
	"github.com/zilliztech/milvus-distributed/internal/kv"
N
neza2017 已提交
12
	pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
13 14
)

Z
zhenshan.cao 已提交
15 16
type UniqueID = typeutil.UniqueID

17
type metaTable struct {
Z
zhenshan.cao 已提交
18
	client        *kv.EtcdKV                       // client of a reliable kv service, i.e. etcd client
Z
zhenshan.cao 已提交
19 20 21 22 23
	tenantId2Meta map[UniqueID]pb.TenantMeta     // tenant id to tenant meta
	proxyId2Meta  map[UniqueID]pb.ProxyMeta      // proxy id to proxy meta
	collId2Meta   map[UniqueID]pb.CollectionMeta // collection id to collection meta
	collName2Id   map[string]UniqueID            // collection name to collection id
	segId2Meta    map[UniqueID]pb.SegmentMeta    // segment id to segment meta
N
neza2017 已提交
24 25 26 27

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
28 29
}

Z
zhenshan.cao 已提交
30
func NewMetaTable(kv *kv.EtcdKV) (*metaTable, error) {
G
godchen 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
	mt := &metaTable{
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

func (mt *metaTable) reloadFromKV() error {

Z
zhenshan.cao 已提交
46 47 48 49 50
	mt.tenantId2Meta = make(map[UniqueID]pb.TenantMeta)
	mt.proxyId2Meta = make(map[UniqueID]pb.ProxyMeta)
	mt.collId2Meta = make(map[UniqueID]pb.CollectionMeta)
	mt.collName2Id = make(map[string]UniqueID)
	mt.segId2Meta = make(map[UniqueID]pb.SegmentMeta)
N
neza2017 已提交
51

G
godchen 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
	_, values, err := mt.client.LoadWithPrefix("tenant")
	if err != nil {
		return err
	}

	for _, value := range values {
		tenant_meta := pb.TenantMeta{}
		err := proto.Unmarshal([]byte(value), &tenant_meta)
		if err != nil {
			return err
		}
		mt.tenantId2Meta[tenant_meta.Id] = tenant_meta
	}

	_, values, err = mt.client.LoadWithPrefix("proxy")
	if err != nil {
		return err
	}

	for _, value := range values {
		proxy_meta := pb.ProxyMeta{}
		err = proto.Unmarshal([]byte(value), &proxy_meta)
		if err != nil {
			return err
		}
		mt.proxyId2Meta[proxy_meta.Id] = proxy_meta
	}

	_, values, err = mt.client.LoadWithPrefix("collection")
	if err != nil {
		return err
	}

	for _, value := range values {
		collection_meta := pb.CollectionMeta{}
		err = proto.Unmarshal([]byte(value), &collection_meta)
		if err != nil {
			return err
		}
		mt.collId2Meta[collection_meta.Id] = collection_meta
		mt.collName2Id[collection_meta.Schema.Name] = collection_meta.Id
	}

	_, values, err = mt.client.LoadWithPrefix("segment")
	if err != nil {
		return err
	}

	for _, value := range values {
		segment_meta := pb.SegmentMeta{}
		err = proto.Unmarshal([]byte(value), &segment_meta)
		if err != nil {
			return err
		}
		mt.segId2Meta[segment_meta.SegmentId] = segment_meta
	}

	return nil
N
neza2017 已提交
110 111 112 113 114 115 116 117 118 119
}

// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
	coll_bytes, err := proto.Marshal(coll)
	if err != nil {
		return err
	}
	mt.collId2Meta[coll.Id] = *coll
	mt.collName2Id[coll.Schema.Name] = coll.Id
G
godchen 已提交
120
	return mt.client.Save("/collection/"+strconv.FormatInt(coll.Id, 10), string(coll_bytes))
N
neza2017 已提交
121 122 123 124 125 126 127 128
}

// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
	seg_bytes, err := proto.Marshal(seg)
	if err != nil {
		return err
	}
G
godchen 已提交
129 130 131 132 133 134 135

	mt.segId2Meta[seg.SegmentId] = *seg

	return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentId, 10), string(seg_bytes))
}

// mt.ddLock.Lock() before call this function
Z
zhenshan.cao 已提交
136
func (mt *metaTable) deleteSegmentMeta(segId UniqueID) error {
G
godchen 已提交
137 138 139 140 141 142 143 144 145 146
	_, ok := mt.segId2Meta[segId]

	if ok {
		delete(mt.segId2Meta, segId)
	}

	return mt.client.Remove("/segment/" + strconv.FormatInt(segId, 10))
}

// mt.ddLock.Lock() before call this function
Z
zhenshan.cao 已提交
147
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIds []UniqueID) error {
G
godchen 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
	segIdStrs := make([]string, 0, len(segIds))
	for _, segId := range segIds {
		segIdStrs = append(segIdStrs, "/segment/"+strconv.FormatInt(segId, 10))
	}

	kvs := make(map[string]string)
	collStrs, err := proto.Marshal(coll)
	if err != nil {
		return err
	}

	kvs["/collection/"+strconv.FormatInt(coll.Id, 10)] = string(collStrs)

	for _, segId := range segIds {
		_, ok := mt.segId2Meta[segId]

		if ok {
			delete(mt.segId2Meta, segId)
		}
	}

	mt.collId2Meta[coll.Id] = *coll

	return mt.client.MultiSaveAndRemove(kvs, segIdStrs)
}

// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg *pb.SegmentMeta) error {
	kvs := make(map[string]string, 0)
	coll_bytes, err := proto.Marshal(coll)
N
neza2017 已提交
178 179
	if err != nil {
		return err
180
	}
G
godchen 已提交
181 182 183 184 185 186 187 188 189 190 191
	kvs["/collection/"+strconv.FormatInt(coll.Id, 10)] = string(coll_bytes)

	mt.collId2Meta[coll.Id] = *coll
	mt.collName2Id[coll.Schema.Name] = coll.Id

	seg_bytes, err := proto.Marshal(seg)
	if err != nil {
		return err
	}
	kvs["/segment/"+strconv.FormatInt(seg.SegmentId, 10)] = string(seg_bytes)

N
neza2017 已提交
192
	mt.segId2Meta[seg.SegmentId] = *seg
G
godchen 已提交
193 194 195 196 197

	return mt.client.MultiSave(kvs)
}

// mt.ddLock.Lock() before call this function
Z
zhenshan.cao 已提交
198
func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collId UniqueID, segIds []UniqueID) error {
G
godchen 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
	collIdStr := "/collection/" + strconv.FormatInt(collId, 10)

	totalIdStrs := make([]string, 0, 1+len(segIds))
	totalIdStrs = append(totalIdStrs, collIdStr)
	for _, singleId := range segIds {
		totalIdStrs = append(totalIdStrs, "/segment/"+strconv.FormatInt(singleId, 10))
	}

	coll_meta, ok := mt.collId2Meta[collId]

	if ok {
		delete(mt.collId2Meta, collId)
	}

	_, ok = mt.collName2Id[coll_meta.Schema.Name]

	if ok {
		delete(mt.collName2Id, coll_meta.Schema.Name)
	}

	for _, segId := range segIds {
		_, ok := mt.segId2Meta[segId]

		if ok {
			delete(mt.segId2Meta, segId)
		}
	}

	return mt.client.MultiRemove(totalIdStrs)
N
neza2017 已提交
228 229 230 231 232
}

func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error {
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
G
godchen 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
	if len(coll.SegmentIds) != 0 {
		return errors.Errorf("segment should be empty when creating collection")
	}
	if len(coll.PartitionTags) != 0 {
		return errors.Errorf("segment should be empty when creating collection")
	}
	_, ok := mt.collName2Id[coll.Schema.Name]
	if ok {
		return errors.Errorf("collection alread exists with name = " + coll.Schema.Name)
	}
	err := mt.saveCollectionMeta(coll)
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil
}

Z
zhenshan.cao 已提交
251
func (mt *metaTable) DeleteCollection(collId UniqueID) error {
G
godchen 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	coll_meta, ok := mt.collId2Meta[collId]
	if !ok {
		return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collId, 10))
	}

	err := mt.deleteCollectionsAndSegmentsMeta(collId, coll_meta.SegmentIds)
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil
}

Z
zhenshan.cao 已提交
268
func (mt *metaTable) HasCollection(collId UniqueID) bool {
G
godchen 已提交
269 270 271 272 273 274 275
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	_, ok := mt.collId2Meta[collId]
	if !ok {
		return false
	}
	return true
N
neza2017 已提交
276 277 278 279 280
}

func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionMeta, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
281

N
neza2017 已提交
282 283 284 285 286 287 288 289 290
	vid, ok := mt.collName2Id[collectionName]
	if !ok {
		return nil, errors.Errorf("can't find collection: " + collectionName)
	}
	col, ok := mt.collId2Meta[vid]
	if !ok {
		return nil, errors.Errorf("can't find collection: " + collectionName)
	}
	return &col, nil
291 292
}

Z
zhenshan.cao 已提交
293
func (mt *metaTable) AddPartition(collId UniqueID, tag string) error {
G
godchen 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collId2Meta[collId]
	if !ok {
		return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collId, 10))
	}

	for _, t := range coll.PartitionTags {
		if t == tag {
			return errors.Errorf("partition already exists.")
		}
	}
	coll.PartitionTags = append(coll.PartitionTags, tag)

	err := mt.saveCollectionMeta(&coll)
N
neza2017 已提交
309
	if err != nil {
G
godchen 已提交
310 311 312 313 314 315
		_ = mt.reloadFromKV()
		return err
	}
	return nil
}

Z
zhenshan.cao 已提交
316
func (mt *metaTable) HasPartition(collId UniqueID, tag string) bool {
G
godchen 已提交
317 318 319 320
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	col, ok := mt.collId2Meta[collId]
	if !ok {
N
neza2017 已提交
321 322
		return false
	}
G
godchen 已提交
323 324
	for _, partitionTag := range col.PartitionTags {
		if partitionTag == tag {
N
neza2017 已提交
325
			return true
326 327
		}
	}
N
neza2017 已提交
328
	return false
329 330
}

Z
zhenshan.cao 已提交
331
func (mt *metaTable) DeletePartition(collId UniqueID, tag string) error {
N
neza2017 已提交
332 333 334
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

G
godchen 已提交
335
	coll_meta, ok := mt.collId2Meta[collId]
N
neza2017 已提交
336
	if !ok {
G
godchen 已提交
337
		return errors.Errorf("can't find collection. id = " + strconv.FormatInt(collId, 10))
N
neza2017 已提交
338
	}
G
godchen 已提交
339 340 341 342

	pt := make([]string, 0, len(coll_meta.PartitionTags))
	for _, t := range coll_meta.PartitionTags {
		if t != tag {
N
neza2017 已提交
343 344 345
			pt = append(pt, t)
		}
	}
G
godchen 已提交
346
	if len(pt) == len(coll_meta.PartitionTags) {
N
neza2017 已提交
347 348 349
		return nil
	}

Z
zhenshan.cao 已提交
350 351
	to_delete_seg := make([]UniqueID, 0, len(coll_meta.SegmentIds))
	seg := make([]UniqueID, 0, len(coll_meta.SegmentIds))
G
godchen 已提交
352
	for _, s := range coll_meta.SegmentIds {
N
neza2017 已提交
353 354 355 356
		sm, ok := mt.segId2Meta[s]
		if !ok {
			return errors.Errorf("can't find segment id = %d", s)
		}
G
godchen 已提交
357
		if sm.PartitionTag != tag {
N
neza2017 已提交
358
			seg = append(seg, s)
G
godchen 已提交
359 360
		} else {
			to_delete_seg = append(to_delete_seg, s)
361 362
		}
	}
G
godchen 已提交
363 364
	coll_meta.PartitionTags = pt
	coll_meta.SegmentIds = seg
N
neza2017 已提交
365

G
godchen 已提交
366 367 368 369 370 371
	err := mt.saveCollectionAndDeleteSegmentsMeta(&coll_meta, to_delete_seg)
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil
N
neza2017 已提交
372
}
373

N
neza2017 已提交
374 375 376
func (mt *metaTable) AddSegment(seg *pb.SegmentMeta) error {
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
G
godchen 已提交
377 378 379 380 381 382 383 384 385
	collId := seg.CollectionId
	coll_meta := mt.collId2Meta[collId]
	coll_meta.SegmentIds = append(coll_meta.SegmentIds, seg.SegmentId)
	err := mt.saveCollectionsAndSegmentsMeta(&coll_meta, seg)
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil
N
neza2017 已提交
386 387
}

Z
zhenshan.cao 已提交
388
func (mt *metaTable) GetSegmentById(segId UniqueID) (*pb.SegmentMeta, error) {
N
neza2017 已提交
389 390 391 392 393 394 395 396
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	sm, ok := mt.segId2Meta[segId]
	if !ok {
		return nil, errors.Errorf("can't find segment id = %d", segId)
	}
	return &sm, nil
397
}
G
godchen 已提交
398

Z
zhenshan.cao 已提交
399
func (mt *metaTable) DeleteSegment(segId UniqueID) error {
G
godchen 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	seg_meta, ok := mt.segId2Meta[segId]
	if !ok {
		return errors.Errorf("can't find segment. id = " + strconv.FormatInt(segId, 10))
	}

	coll_meta, ok := mt.collId2Meta[seg_meta.CollectionId]
	if !ok {
		return errors.Errorf("can't find collection. id = " + strconv.FormatInt(seg_meta.CollectionId, 10))
	}

	for i := 0; i < len(coll_meta.SegmentIds); i++ {
		if coll_meta.SegmentIds[i] == segId {
			coll_meta.SegmentIds = append(coll_meta.SegmentIds[:i], coll_meta.SegmentIds[i+1:]...)
		}
	}

Z
zhenshan.cao 已提交
419
	err := mt.saveCollectionAndDeleteSegmentsMeta(&coll_meta, []UniqueID{segId})
G
godchen 已提交
420 421 422 423 424 425 426
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil

}
Z
zhenshan.cao 已提交
427
func (mt *metaTable) CloseSegment(segId UniqueID, closeTs Timestamp, num_rows int64) error {
G
godchen 已提交
428 429 430 431 432 433 434 435
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	seg_meta, ok := mt.segId2Meta[segId]
	if !ok {
		return errors.Errorf("can't find segment id = " + strconv.FormatInt(segId, 10))
	}

Z
zhenshan.cao 已提交
436
	seg_meta.CloseTime = closeTs
G
godchen 已提交
437 438 439 440 441 442 443 444 445
	seg_meta.NumRows = num_rows

	err := mt.saveSegmentMeta(&seg_meta)
	if err != nil {
		_ = mt.reloadFromKV()
		return err
	}
	return nil
}