提交 3bd6c541 编写于 作者: S shengjh 提交者: yefu.chen

Fix core dump

Signed-off-by: Nshengjh <1572099106@qq.com>
上级 90cd86de
package collection
import (
"time"
masterpb "github.com/czs007/suvlim/pkg/master/grpc/master"
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Collection struct {
ID uint64 `json:"id"`
Name string `json:"name"`
CreateTime uint64 `json:"creat_time"`
Schema []FieldMeta `json:"schema"`
// ExtraSchema []FieldMeta `json:"extra_schema"`
SegmentIDs []uint64 `json:"segment_ids"`
PartitionTags []string `json:"partition_tags"`
GrpcMarshalString string `json:"grpc_marshal_string"`
IndexParam []*messagepb.IndexParam `json:"index_param"`
}
type FieldMeta struct {
FieldName string `json:"field_name"`
Type messagepb.DataType `json:"type"`
DIM int64 `json:"dimension"`
}
func GrpcMarshal(c *Collection) *Collection {
if c.GrpcMarshalString != "" {
c.GrpcMarshalString = ""
}
pbSchema := &messagepb.Schema{
FieldMetas: []*messagepb.FieldMeta{},
}
schemaSlice := []*messagepb.FieldMeta{}
for _, v := range c.Schema {
newpbMeta := &messagepb.FieldMeta{
FieldName: v.FieldName,
Type: v.Type,
Dim: v.DIM,
}
schemaSlice = append(schemaSlice, newpbMeta)
}
pbSchema.FieldMetas = schemaSlice
grpcCollection := &masterpb.Collection{
Id: c.ID,
Name: c.Name,
Schema: pbSchema,
CreateTime: c.CreateTime,
SegmentIds: c.SegmentIDs,
PartitionTags: c.PartitionTags,
Indexes: c.IndexParam,
}
out := proto.MarshalTextString(grpcCollection)
c.GrpcMarshalString = out
return c
}
func NewCollection(id uint64, name string, createTime time.Time,
schema []*messagepb.FieldMeta, sIds []uint64, ptags []string) Collection {
segementIDs := []uint64{}
newSchema := []FieldMeta{}
for _, v := range schema {
newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type, DIM: v.Dim})
}
for _, sid := range sIds {
segementIDs = append(segementIDs, sid)
}
return Collection{
ID: id,
Name: name,
CreateTime: uint64(createTime.Unix()),
Schema: newSchema,
SegmentIDs: segementIDs,
PartitionTags: ptags,
}
}
func Collection2JSON(c Collection) (string, error) {
b, err := json.Marshal(&c)
if err != nil {
return "", err
}
return string(b), nil
}
func JSON2Collection(s string) (*Collection, error) {
var c Collection
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return &Collection{}, err
}
return &c, nil
}
package collection
import (
"testing"
"time"
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/stretchr/testify/assert"
)
var (
cid = uint64(10011111234)
name = "test-segment"
createTime = time.Now()
schema = []*messagepb.FieldMeta{}
sIds = []uint64{111111, 222222}
ptags = []string{"default", "test"}
)
func TestNewCollection(t *testing.T) {
assert := assert.New(t)
c := NewCollection(cid, name, createTime, schema, sIds, ptags)
assert.Equal(cid, c.ID)
assert.Equal(name, c.Name)
for k, v := range schema {
assert.Equal(v.Dim, c.Schema[k].DIM)
assert.Equal(v.FieldName, c.Schema[k].FieldName)
assert.Equal(v.Type, c.Schema[k].Type)
}
assert.Equal(sIds, c.SegmentIDs)
assert.Equal(ptags, c.PartitionTags)
}
func TestGrpcMarshal(t *testing.T) {
assert := assert.New(t)
c := NewCollection(cid, name, createTime, schema, sIds, ptags)
newc := GrpcMarshal(&c)
assert.NotEqual("", newc.GrpcMarshalString)
}
package common
//const (
// PULSAR_URL = "pulsar://localhost:6650"
// PULSAR_MONITER_INTERVAL = 1 * time.Second
// PULSAR_TOPIC = "monitor-topic"
// ETCD_ROOT_PATH = "by-dev"
// SEGMENT_THRESHOLE = 10000
// DEFAULT_GRPC_PORT = ":53100"
//)
package controller
import (
"log"
"strconv"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/collection"
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/id"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/segment"
)
func CollectionController(ch chan *messagepb.Mapping, kvbase kv.Base, errch chan error) {
for collectionMeta := range ch {
sID := id.New().Uint64()
cID := id.New().Uint64()
s2ID := id.New().Uint64()
fieldMetas := []*messagepb.FieldMeta{}
if collectionMeta.Schema != nil {
fieldMetas = collectionMeta.Schema.FieldMetas
}
c := collection.NewCollection(cID, collectionMeta.CollectionName,
time.Now(), fieldMetas, []uint64{sID, s2ID},
[]string{"default"})
cm := collection.GrpcMarshal(&c)
s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0))
s2 := segment.NewSegment(s2ID, cID, collectionMeta.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0))
collectionData, _ := collection.Collection2JSON(*cm)
segmentData, err := segment.Segment2JSON(s)
if err != nil {
log.Fatal(err)
}
s2Data, err := segment.Segment2JSON(s2)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data)
if err != nil {
log.Fatal(err)
}
}
}
func WriteCollection2Datastore(collectionMeta *messagepb.Mapping, kvbase kv.Base) error {
sID := id.New().Uint64()
cID := id.New().Uint64()
fieldMetas := []*messagepb.FieldMeta{}
if collectionMeta.Schema != nil {
fieldMetas = collectionMeta.Schema.FieldMetas
}
c := collection.NewCollection(cID, collectionMeta.CollectionName,
time.Now(), fieldMetas, []uint64{sID},
[]string{"default"})
cm := collection.GrpcMarshal(&c)
s := segment.NewSegment(sID, cID, collectionMeta.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0))
collectionData, err := collection.Collection2JSON(*cm)
if err != nil {
log.Fatal(err)
return err
}
segmentData, err := segment.Segment2JSON(s)
if err != nil {
log.Fatal(err)
return err
}
err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData)
if err != nil {
log.Fatal(err)
return err
}
err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData)
if err != nil {
log.Fatal(err)
return err
}
return nil
}
func UpdateCollectionIndex(index *messagepb.IndexParam, kvbase kv.Base) error {
collectionName := index.CollectionName
collectionJSON, err := kvbase.Load("collection/" + collectionName)
if err != nil {
return err
}
c, err := collection.JSON2Collection(collectionJSON)
if err != nil {
return err
}
for k, v := range c.IndexParam {
if v.IndexName == index.IndexName {
c.IndexParam[k] = v
}
}
c.IndexParam = append(c.IndexParam, index)
cm := collection.GrpcMarshal(c)
collectionData, err := collection.Collection2JSON(*cm)
if err != nil {
return err
}
err = kvbase.Save("collection/"+collectionName, collectionData)
if err != nil {
return err
}
return nil
}
package controller
import (
"fmt"
"strconv"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/collection"
"github.com/czs007/suvlim/pkg/master/id"
"github.com/czs007/suvlim/pkg/master/informer"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/segment"
)
func SegmentStatsController(kvbase kv.Base, errch chan error) {
ssChan := make(chan segment.SegmentStats, 10)
ssClient := informer.NewPulsarClient()
go segment.Listener(ssChan, ssClient)
for {
select {
case ss := <-ssChan:
errch <- ComputeCloseTime(ss, kvbase)
errch <- UpdateSegmentStatus(ss, kvbase)
case <-time.After(5 * time.Second):
fmt.Println("wait for new request")
return
}
}
}
func ComputeCloseTime(ss segment.SegmentStats, kvbase kv.Base) error {
if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) {
currentTime := time.Now()
memRate := int(ss.MemoryRate)
if memRate == 0 {
memRate = 1
}
sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate
data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
if err != nil {
return err
}
seg, err := segment.JSON2Segment(data)
if err != nil {
return err
}
seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix())
fmt.Println(seg)
updateData, err := segment.Segment2JSON(*seg)
if err != nil {
return err
}
kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData)
//create new segment
newSegID := id.New().Uint64()
newSeg := segment.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0))
newSegData, err := segment.Segment2JSON(*&newSeg)
if err != nil {
return err
}
//save to kv store
kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData)
// update collection data
c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID)))
collectionMeta, err := collection.JSON2Collection(c)
if err != nil {
return err
}
segIDs := collectionMeta.SegmentIDs
segIDs = append(segIDs, newSegID)
collectionMeta.SegmentIDs = segIDs
cData, err := collection.Collection2JSON(*collectionMeta)
if err != nil {
return err
}
kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData)
}
return nil
}
func UpdateSegmentStatus(ss segment.SegmentStats, kvbase kv.Base) error {
segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
if err != nil {
return err
}
seg, err := segment.JSON2Segment(segmentData)
if err != nil {
return err
}
var changed bool
changed = false
if seg.Status != ss.Status {
changed = true
seg.Status = ss.Status
}
if seg.Rows != ss.Rows {
changed = true
seg.Rows = ss.Rows
}
if changed {
segData, err := segment.Segment2JSON(*seg)
if err != nil {
return err
}
err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData)
if err != nil {
return err
}
}
return nil
}
package controller
import (
"strconv"
"testing"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/segment"
"go.etcd.io/etcd/clientv3"
)
func newKvBase() kv.Base {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvbase
}
func TestComputeClosetTime(t *testing.T) {
kvbase := newKvBase()
var news segment.SegmentStats
for i := 0; i < 10; i++ {
news = segment.SegmentStats{
SegementID: uint64(6875940398055133887),
MemorySize: uint64(i * 1000),
MemoryRate: 0.9,
}
ComputeCloseTime(news, kvbase)
}
}
package grpc
import (
"context"
"fmt"
"net"
"strconv"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/controller"
masterpb "github.com/czs007/suvlim/pkg/master/grpc/master"
"github.com/czs007/suvlim/pkg/master/grpc/message"
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/kv"
"google.golang.org/grpc"
)
func Server(ch chan *messagepb.Mapping, errch chan error, kvbase kv.Base) {
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
// log.Fatal("failed to listen: %v", err)
errch <- err
return
}
s := grpc.NewServer()
masterpb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch, kvbase: kvbase})
if err := s.Serve(lis); err != nil {
// log.Fatalf("failed to serve: %v", err)
errch <- err
return
}
}
type GRPCMasterServer struct {
CreateRequest chan *messagepb.Mapping
kvbase kv.Base
}
func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) {
// ms.CreateRequest <- in2
fmt.Println("Handle a new create collection request")
err := controller.WriteCollection2Datastore(in, ms.kvbase)
if err != nil {
return &messagepb.Status{
ErrorCode: 100,
Reason: "",
}, err
}
return &messagepb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) {
fmt.Println("Handle a new create index request")
err := controller.UpdateCollectionIndex(in, ms.kvbase)
if err != nil {
return &messagepb.Status{
ErrorCode: 100,
Reason: "",
}, err
}
return &messagepb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
package informer
import (
"context"
"fmt"
"github.com/czs007/suvlim/conf"
"log"
"strconv"
"time"
"github.com/czs007/suvlim/conf"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/pkg/master/mock"
)
func NewPulsarClient() PulsarClient {
......@@ -32,3 +34,30 @@ func NewPulsarClient() PulsarClient {
type PulsarClient struct {
Client pulsar.Client
}
func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: conf.Config.Master.PulsarTopic,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
for {
msg, err := consumer.Receive(context.TODO())
if err != nil {
log.Fatal(err)
}
m, _ := mock.SegmentUnMarshal(msg.Payload())
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegementID)
ssChan <- m
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
return nil
}
......@@ -34,22 +34,23 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase {
}
}
func (kv *EtcdKVBase) Close() {
func (kv *EtcdKVBase) Close(){
kv.client.Close()
}
func (kv *EtcdKVBase) LoadWithPrefix(key string) ([]string, []string) {
func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) {
key = path.Join(kv.rootPath, key)
println("in loadWithPrefix,", key)
resp, err := etcdutil.EtcdKVGet(kv.client, key, clientv3.WithPrefix())
resp, err := etcdutil.EtcdKVGet(kv.client, key,clientv3.WithPrefix())
if err != nil {
return []string{}, []string{}
return [] string {}, [] string {}
}
var keys []string
var values []string
for _, kvs := range resp.Kvs {
for _,kvs := range resp.Kvs{
//println(len(kvs.))
if len(kvs.Key) <= 0 {
if len(kvs.Key) <= 0{
println("KKK")
continue
}
......
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/mock"
)
func main() {
FakeProduecer()
}
func FakeProduecer() {
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarAddr,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: conf.Config.Master.PulsarTopic,
})
testSegmentStats, _ := mock.SegmentMarshal(mock.SegmentStats{
SegementID: uint64(6875939483227099806),
MemorySize: uint64(9999),
MemoryRate: float64(0.13),
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: testSegmentStats,
})
time.Sleep(1 * time.Second)
defer producer.Close()
if err != nil {
fmt.Errorf("%v", err)
}
fmt.Println("Published message")
}
......@@ -3,12 +3,11 @@ package mock
import (
"context"
"fmt"
"github.com/czs007/suvlim/conf"
"log"
"strconv"
"time"
"github.com/czs007/suvlim/conf"
"github.com/apache/pulsar-client-go/pulsar"
)
......@@ -30,8 +29,8 @@ func FakePulsarProducer() {
Topic: conf.Config.Master.PulsarTopic,
})
testSegmentStats, _ := SegmentMarshal(SegmentStats{
SegementID: uint64(6875875531164062448),
MemorySize: uint64(9999),
SegementID: uint64(1111),
MemorySize: uint64(333322),
MemoryRate: float64(0.13),
})
for {
......
package mock
import (
"context"
"fmt"
"log"
"strconv"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/conf"
)
func TestSegmentMarshal(t *testing.T) {
s := SegmentStats{
SegementID: uint64(6875873667148255882),
MemorySize: uint64(9999),
SegementID: uint64(12315),
MemorySize: uint64(233113),
MemoryRate: float64(0.13),
}
......@@ -53,38 +47,3 @@ func TestSegment2JSON(t *testing.T) {
}
fmt.Println(res)
}
func TestFakePulsarProducer(t *testing.T) {
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarAddr,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: conf.Config.Master.PulsarTopic,
})
testSegmentStats, _ := SegmentMarshal(SegmentStats{
SegementID: uint64(6875875531164062448),
MemorySize: uint64(9999),
MemoryRate: float64(0.13),
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: testSegmentStats,
})
time.Sleep(1 * time.Second)
defer producer.Close()
if err != nil {
t.Error(err)
}
fmt.Println("Published message")
}
package segment
import (
"time"
masterpb "github.com/czs007/suvlim/pkg/master/grpc/master"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Segment struct {
SegmentID uint64 `json:"segment_id"`
CollectionID uint64 `json:"collection_id"`
PartitionTag string `json:"partition_tag"`
ChannelStart int `json:"channel_start"`
ChannelEnd int `json:"channel_end"`
OpenTimeStamp uint64 `json:"open_timestamp"`
CloseTimeStamp uint64 `json:"close_timestamp"`
CollectionName string `json:"collection_name"`
Status masterpb.SegmentStatus `json:"segment_status"`
Rows int64 `json:"rows"`
}
func NewSegment(id uint64, collectioID uint64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment {
return Segment{
SegmentID: id,
CollectionID: collectioID,
CollectionName: cName,
PartitionTag: ptag,
ChannelStart: chStart,
ChannelEnd: chEnd,
OpenTimeStamp: uint64(openTime.Unix()),
CloseTimeStamp: uint64(closeTime.Unix()),
}
}
func Segment2JSON(s Segment) (string, error) {
b, err := json.Marshal(&s)
if err != nil {
return "", err
}
return string(b), nil
}
func JSON2Segment(s string) (*Segment, error) {
var c Segment
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return &Segment{}, err
}
return &c, nil
}
package segment
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/conf"
masterpb "github.com/czs007/suvlim/pkg/master/grpc/master"
"github.com/czs007/suvlim/pkg/master/informer"
)
type SegmentStats struct {
SegementID uint64
MemorySize uint64
MemoryRate float64
Status masterpb.SegmentStatus
Rows int64
}
func SegmentMarshal(s SegmentStats) ([]byte, error) {
var nb bytes.Buffer
enc := gob.NewEncoder(&nb)
err := enc.Encode(s)
if err != nil {
return []byte{}, err
}
return nb.Bytes(), nil
}
func SegmentUnMarshal(data []byte) (SegmentStats, error) {
var ss SegmentStats
dec := gob.NewDecoder(bytes.NewBuffer(data))
err := dec.Decode(&ss)
if err != nil {
return SegmentStats{}, err
}
return ss, nil
}
func Listener(ssChan chan SegmentStats, pc informer.PulsarClient) error {
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: conf.Config.Master.PulsarTopic,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
for {
msg, err := consumer.Receive(context.TODO())
if err != nil {
log.Fatal(err)
}
m, _ := SegmentUnMarshal(msg.Payload())
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegementID)
ssChan <- m
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
return nil
}
package master
import (
"context"
"fmt"
"log"
"net"
"strconv"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/controller"
milvusgrpc "github.com/czs007/suvlim/pkg/master/grpc"
pb "github.com/czs007/suvlim/pkg/master/grpc/master"
"github.com/czs007/suvlim/pkg/master/grpc/message"
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/id"
"github.com/czs007/suvlim/pkg/master/informer"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/mock"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
func Run() {
kvbase := newKvBase()
go mock.FakePulsarProducer()
go SegmentStatsController()
collectionChan := make(chan *messagepb.Mapping)
defer close(collectionChan)
go GRPCServer(collectionChan)
go CollectionController(collectionChan)
for {
}
}
errorch := make(chan error)
defer close(errorch)
func SegmentStatsController() {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
go milvusgrpc.Server(collectionChan, errorch, kvbase)
go controller.SegmentStatsController(kvbase, errorch)
go controller.CollectionController(collectionChan, kvbase, errorch)
ssChan := make(chan mock.SegmentStats, 10)
defer close(ssChan)
ssClient := informer.NewPulsarClient()
go ssClient.Listener(ssChan)
for {
for v := range errorch {
log.Fatal(v)
select {
case ss := <-ssChan:
ComputeCloseTime(ss, kvbase)
UpdateSegmentStatus(ss, kvbase)
case <-time.After(5 * time.Second):
fmt.Println("timeout")
return
}
}
}
func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error {
if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) {
currentTime := time.Now()
memRate := int(ss.MemoryRate)
if memRate == 0 {
memRate = 1
}
sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate
data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
if err != nil {
return err
}
seg, err := mock.JSON2Segment(data)
if err != nil {
return err
}
seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix())
fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp)
updateData, err := mock.Segment2JSON(*seg)
if err != nil {
return err
}
kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData)
//create new segment
newSegID := id.New().Uint64()
newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0))
newSegData, err := mock.Segment2JSON(*&newSeg)
if err != nil {
return err
}
//save to kv store
kvbase.Save("segment/"+strconv.Itoa(int(newSegID)), newSegData)
// update collection data
c, _ := kvbase.Load("collection/" + strconv.Itoa(int(seg.CollectionID)))
collection, err := mock.JSON2Collection(c)
if err != nil {
return err
}
segIDs := collection.SegmentIDs
segIDs = append(segIDs, newSegID)
collection.SegmentIDs = segIDs
cData, err := mock.Collection2JSON(*collection)
if err != nil {
return err
}
kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), cData)
}
return nil
}
func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error {
segmentData, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID)))
if err != nil {
return err
}
seg, err := mock.JSON2Segment(segmentData)
if err != nil {
return err
}
var changed bool
changed = false
if seg.Status != ss.Status {
changed = true
seg.Status = ss.Status
}
if seg.Rows != ss.Rows {
changed = true
seg.Rows = ss.Rows
}
if changed {
segData, err := mock.Segment2JSON(*seg)
if err != nil {
return err
}
err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData)
if err != nil {
return err
}
}
return nil
}
func GRPCServer(ch chan *messagepb.Mapping) error {
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
return err
}
s := grpc.NewServer()
pb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
return err
}
return nil
}
type GRPCMasterServer struct {
CreateRequest chan *messagepb.Mapping
}
func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) {
// ms.CreateRequest <- in2
fmt.Println("Handle a new create collection request")
err := WriteCollection2Datastore(in)
if err != nil {
return &messagepb.Status{
ErrorCode: 100,
Reason: "",
}, err
}
return &messagepb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*message.Status, error) {
fmt.Println("Handle a new create index request")
err := UpdateCollectionIndex(in)
if err != nil {
return &messagepb.Status{
ErrorCode: 100,
Reason: "",
}, err
}
return &messagepb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
// func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *pb.CreateCollectionRequest) (*pb.CreateCollectionResponse, error) {
// return &pb.CreateCollectionResponse{
// CollectionName: in.CollectionName,
// }, nil
// }
func CollectionController(ch chan *messagepb.Mapping) {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
for collection := range ch {
sID := id.New().Uint64()
cID := id.New().Uint64()
s2ID := id.New().Uint64()
fieldMetas := []*messagepb.FieldMeta{}
if collection.Schema != nil {
fieldMetas = collection.Schema.FieldMetas
}
c := mock.NewCollection(cID, collection.CollectionName,
time.Now(), fieldMetas, []uint64{sID, s2ID},
[]string{"default"})
cm := mock.GrpcMarshal(&c)
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0))
s2 := mock.NewSegment(s2ID, cID, collection.CollectionName, "default", 512, 1023, time.Now(), time.Unix(1<<36-1, 0))
collectionData, _ := mock.Collection2JSON(*cm)
segmentData, err := mock.Segment2JSON(s)
if err != nil {
log.Fatal(err)
}
s2Data, err := mock.Segment2JSON(s2)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData)
if err != nil {
log.Fatal(err)
}
err = kvbase.Save("segment/"+strconv.FormatUint(s2ID, 10), s2Data)
if err != nil {
log.Fatal(err)
}
}
}
func WriteCollection2Datastore(collection *messagepb.Mapping) error {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
sID := id.New().Uint64()
cID := id.New().Uint64()
fieldMetas := []*messagepb.FieldMeta{}
if collection.Schema != nil {
fieldMetas = collection.Schema.FieldMetas
}
c := mock.NewCollection(cID, collection.CollectionName,
time.Now(), fieldMetas, []uint64{sID},
[]string{"default"})
cm := mock.GrpcMarshal(&c)
s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0))
collectionData, err := mock.Collection2JSON(*cm)
if err != nil {
log.Fatal(err)
return err
}
segmentData, err := mock.Segment2JSON(s)
if err != nil {
log.Fatal(err)
return err
}
err = kvbase.Save("collection/"+strconv.FormatUint(cID, 10), collectionData)
if err != nil {
log.Fatal(err)
return err
}
err = kvbase.Save("segment/"+strconv.FormatUint(sID, 10), segmentData)
if err != nil {
log.Fatal(err)
return err
}
return nil
}
func newKvBase() kv.Base {
func UpdateCollectionIndex(index *messagepb.IndexParam) error {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
......@@ -40,7 +299,31 @@ func newKvBase() kv.Base {
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
// defer cli.Close()
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvbase
collectionName := index.CollectionName
c, err := kvbase.Load("collection/" + collectionName)
if err != nil {
return err
}
collection, err := mock.JSON2Collection(c)
if err != nil {
return err
}
for k, v := range collection.IndexParam {
if v.IndexName == index.IndexName {
collection.IndexParam[k] = v
}
}
collection.IndexParam = append(collection.IndexParam, index)
cm := mock.GrpcMarshal(collection)
collectionData, err := mock.Collection2JSON(*cm)
if err != nil {
return err
}
err = kvbase.Save("collection/"+collectionName, collectionData)
if err != nil {
return err
}
return nil
}
......@@ -21,6 +21,9 @@ Watcher::Watcher(const std::string &address,
}
void Watcher::Cancel() {
if (call_ == nullptr){
return;
}
call_->CancelWatch();
}
......
......@@ -14,17 +14,23 @@ namespace milvus {
namespace server {
namespace {
void ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) {
auto json = JSON::parse(json_str);
segment_info.set_segment_id(json["segment_id"].get<uint64_t>());
segment_info.set_partition_tag(json["partition_tag"].get<std::string>());
segment_info.set_channel_start(json["channel_start"].get<int32_t>());
segment_info.set_channel_end(json["channel_end"].get<int32_t>());
segment_info.set_open_timestamp(json["open_timestamp"].get<uint64_t>());
segment_info.set_close_timestamp(json["close_timestamp"].get<uint64_t>());
segment_info.set_collection_id(json["collection_id"].get<uint64_t>());
segment_info.set_collection_name(json["collection_name"].get<std::string>());
segment_info.set_rows(json["rows"].get<std::int64_t>());
Status ParseSegmentInfo(const std::string &json_str, SegmentInfo &segment_info) {
try {
auto json = JSON::parse(json_str);
segment_info.set_segment_id(json["segment_id"].get<uint64_t>());
segment_info.set_partition_tag(json["partition_tag"].get<std::string>());
segment_info.set_channel_start(json["channel_start"].get<int32_t>());
segment_info.set_channel_end(json["channel_end"].get<int32_t>());
segment_info.set_open_timestamp(json["open_timestamp"].get<uint64_t>());
segment_info.set_close_timestamp(json["close_timestamp"].get<uint64_t>());
segment_info.set_collection_id(json["collection_id"].get<uint64_t>());
segment_info.set_collection_name(json["collection_name"].get<std::string>());
segment_info.set_rows(json["rows"].get<std::int64_t>());
return Status::OK();
}
catch (const std::exception &e) {
return Status(DB_ERROR, e.what());
}
}
void ParseCollectionSchema(const std::string &json_str, Collection &collection) {
......@@ -64,7 +70,7 @@ Status MetaWrapper::Init() {
// init etcd watcher
auto f = [&](const etcdserverpb::WatchResponse &res) {
UpdateMeta(res);
UpdateMeta(res);
};
watcher_ = std::make_shared<milvus::master::Watcher>(etcd_addr, segment_path_, f, true);
return SyncMeta();
......@@ -87,10 +93,14 @@ void MetaWrapper::UpdateMeta(const etcdserverpb::WatchResponse &res) {
if (event_key.rfind(segment_path_, 0) == 0) {
// segment info
SegmentInfo segment_info;
ParseSegmentInfo(event_value, segment_info);
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
auto status = ParseSegmentInfo(event_value, segment_info);
if (status.ok()) {
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
} else {
return;
}
} else {
// table scheme
Collection collection;
......@@ -152,10 +162,14 @@ Status MetaWrapper::SyncMeta() {
} else {
assert(IsSegmentMetaKey(kv.key()));
SegmentInfo segment_info;
ParseSegmentInfo(kv.value(), segment_info);
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
status = ParseSegmentInfo(kv.value(), segment_info);
if (status.ok()) {
std::unique_lock lock(mutex_);
segment_infos_[segment_info.segment_id()] = segment_info;
lock.unlock();
} else {
return status;
}
}
}
}
......
......@@ -242,6 +242,12 @@ Server::StartService() {
goto FAIL;
}
stat = MetaWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message();
goto FAIL;
}
// Init pulsar message client
stat = MessageWrapper::GetInstance().Init();
if (!stat.ok()) {
......@@ -249,12 +255,6 @@ Server::StartService() {
goto FAIL;
}
stat = MetaWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Meta start service fail: " << stat.message();
goto FAIL;
}
grpc::GrpcServer::GetInstance().Start();
return Status::OK();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册