segment.go 3.1 KB
Newer Older
1 2 3 4 5 6 7
package controller

import (
	"fmt"
	"strconv"
	"time"

8 9 10 11 12 13
	"github.com/zilliztech/milvus-distributed/internal/conf"
	"github.com/zilliztech/milvus-distributed/internal/master/collection"
	"github.com/zilliztech/milvus-distributed/internal/master/id"
	"github.com/zilliztech/milvus-distributed/internal/master/informer"
	"github.com/zilliztech/milvus-distributed/internal/master/kv"
	"github.com/zilliztech/milvus-distributed/internal/master/segment"
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
)

func SegmentStatsController(kvbase kv.Base, errch chan error) {
	ssChan := make(chan segment.SegmentStats, 10)
	ssClient := informer.NewPulsarClient()
	go segment.Listener(ssChan, ssClient)
	for {
		select {
		case ss := <-ssChan:
			errch <- ComputeCloseTime(ss, kvbase)
			errch <- UpdateSegmentStatus(ss, kvbase)
		case <-time.After(5 * time.Second):
			fmt.Println("wait for new request")
			return
		}
	}

}

func ComputeCloseTime(ss segment.SegmentStats, kvbase kv.Base) error {
	if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) {
		currentTime := time.Now()
		memRate := int(ss.MemoryRate)
		if memRate == 0 {
			memRate = 1
		}
		sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate
		data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
		if err != nil {
			return err
		}
		seg, err := segment.JSON2Segment(data)
		if err != nil {
			return err
		}
		seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix())
		fmt.Println(seg)
		updateData, err := segment.Segment2JSON(*seg)
		if err != nil {
			return err
		}
		kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData)
		//create new segment
		newSegID := id.New().Uint64()
		newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0))
		newSegData, err := segment.Segment2JSON(*&newSeg)
		if err != nil {
			return err
		}
		//save to kv store
		kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData)
		// update collection data
		c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID)))
		collectionMeta, err := collection.JSON2Collection(c)
		if err != nil {
			return err
		}
		segIDs := collectionMeta.SegmentIDs
		segIDs = append(segIDs, newSegID)
		collectionMeta.SegmentIDs = segIDs
		cData, err := collection.Collection2JSON(*collectionMeta)
		if err != nil {
			return err
		}
		kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData)
	}
	return nil
}

func UpdateSegmentStatus(ss segment.SegmentStats, kvbase kv.Base) error {
	segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
	if err != nil {
		return err
	}
	seg, err := segment.JSON2Segment(segmentData)
	if err != nil {
		return err
	}
	var changed bool
	changed = false
	if seg.Status != ss.Status {
		changed = true
		seg.Status = ss.Status
	}
	if seg.Rows != ss.Rows {
		changed = true
		seg.Rows = ss.Rows
	}

	if changed {
		segData, err := segment.Segment2JSON(*seg)
		if err != nil {
			return err
		}
		err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData)
		if err != nil {
			return err
		}
	}
	return nil
}