提交 642554e7 编写于 作者: R rain 提交者: yefu.chen

Refactor directory structure and update the default value of server

Signed-off-by: Nrain <boyan.wang@zilliz.com>
上级 b948a343
package main
import (
"github.com/czs007/suvlim/pkg/master"
)
// func main() {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// cfg := config.NewConfig()
// s, err := server.CreateServer(ctx, cfg)
// if err != nil {
// panic(err)
// }
// err = s.Run()
// if err != nil {
// fmt.Println(err)
// }
// }
func init() {
// go mock.FakePulsarProducer()
}
func main() {
//master.SegmentStatsController()
master.CollectionController()
}
......@@ -5,31 +5,49 @@ go 1.15
require (
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/apache/pulsar-client-go v0.2.0
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200825011529-c078454b47b6 // indirect
github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/docker/go-units v0.4.0
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/json-iterator/go v1.1.10
github.com/keybase/go-keychain v0.0.0-20200502122510-cda31fe0c86d // indirect
github.com/klauspost/compress v1.10.11 // indirect
github.com/minio/minio-go/v7 v7.0.5
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.4 // indirect
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48
github.com/prometheus/common v0.13.0 // indirect
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b
github.com/tikv/pd v2.1.19+incompatible
github.com/yahoo/athenz v1.9.16 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.15.0
golang.org/x/net v0.0.0-20200822124328-c89045814202
google.golang.org/grpc v1.31.1
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 // indirect
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect
google.golang.org/grpc v1.31.0
google.golang.org/grpc/examples v0.0.0-20200828165940-d8ef479ab79a // indirect
google.golang.org/protobuf v1.25.0
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
sigs.k8s.io/yaml v1.2.0 // indirect
)
replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
//replace go.etcd.io/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200329194405-dd816f0735f8
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
replace github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible
此差异已折叠。
......@@ -18,7 +18,9 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/czs007/suvlim/util/grpcutil"
//"google.golang.org/grpc"
"net/url"
"os"
......@@ -37,8 +39,8 @@ import (
// Config is the pd server configuration.
type Config struct {
flagSet *flag.FlagSet
Version bool `json:"-"`
flagSet *flag.FlagSet
Version bool `json:"-"`
ConfigCheck bool `json:"-"`
ClientUrls string `toml:"client-urls" json:"client-urls"`
......@@ -46,31 +48,29 @@ type Config struct {
AdvertiseClientUrls string `toml:"advertise-client-urls" json:"advertise-client-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
EnableGRPCGateway bool `json:"enable-grpc-gateway"`
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
InitialClusterToken string `toml:"initial-cluster-token" json:"initial-cluster-token"`
LeaderLease int64 `toml:"lease" json:"lease"`
Log log.Config `toml:"log" json:"log"`
LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"`
LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"`
LeaderLease int64 `toml:"lease" json:"lease"`
Log log.Config `toml:"log" json:"log"`
LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"`
LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"`
PDServerCfg PDServerConfig `toml:"pd-server" json:"pd-server"`
TickInterval typeutil.Duration `toml:"tick-interval"`
ElectionInterval typeutil.Duration `toml:"election-interval"`
TickInterval typeutil.Duration `toml:"tick-interval"`
ElectionInterval typeutil.Duration `toml:"election-interval"`
DisableStrictReconfigCheck bool
// TsoSaveInterval is the interval to save timestamp.
TsoSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`
PreVote bool `toml:"enable-prevote"`
PreVote bool `toml:"enable-prevote"`
Security grpcutil.SecurityConfig `toml:"security" json:"security"`
configFile string
......@@ -89,35 +89,35 @@ type Config struct {
// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg.HeartbeatStreamBindInterval = typeutil.NewDuration(defaultHeartbeatStreamRebindInterval)
cfg.TickInterval = typeutil.NewDuration(defaultTickInterval)
cfg.ElectionInterval = typeutil.NewDuration(defaultElectionInterval)
cfg.flagSet = flag.NewFlagSet("pd", flag.ContinueOnError)
fs := cfg.flagSet
fs.BoolVar(&cfg.Version, "V", false, "print version information and exit")
fs.BoolVar(&cfg.Version, "version", false, "print version information and exit")
fs.StringVar(&cfg.configFile, "config", "", "config file")
fs.BoolVar(&cfg.ConfigCheck, "config-check", false, "check config file validity and exit")
fs.StringVar(&cfg.Name, "name", "", "human-readable name for this pd member")
fs.StringVar(&cfg.Name, "name", defaultName, "human-readable name for this pd member")
fs.StringVar(&cfg.DataDir, "data-dir", "", "path to the data directory (default 'default.${name}')")
fs.StringVar(&cfg.DataDir, "data-dir", "/data/pd/data", "path to the data directory (default 'default.${name}')")
fs.StringVar(&cfg.ClientUrls, "client-urls", defaultClientUrls, "url for client traffic")
fs.StringVar(&cfg.AdvertiseClientUrls, "advertise-client-urls", "", "advertise url for client traffic (default '${client-urls}')")
fs.StringVar(&cfg.AdvertiseClientUrls, "advertise-client-urls", defaultClientUrls, "advertise url for client traffic (default '${client-urls}')")
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "url for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", "advertise url for peer traffic (default '${peer-urls}')")
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", defaultPeerUrls, "advertise url for peer traffic (default '${peer-urls}')")
fs.StringVar(&cfg.InitialCluster, "initial-cluster", "pd=http://127.0.0.1:12380", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
fs.StringVar(&cfg.InitialClusterState, "initial-cluster-start", defaultInitialClusterState, "initial cluster state")
fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path")
return cfg
}
const (
defaultLeaderLease = int64(3)
defaultLeaderLease = int64(3)
defaultName = "pd"
defaultClientUrls = "http://127.0.0.1:2379"
defaultPeerUrls = "http://127.0.0.1:2380"
defaultClientUrls = "http://127.0.0.1:12379,http://127.0.0.1:12378,http://127.0.0.1:12377"
defaultPeerUrls = "http://127.0.0.1:12380,http://127.0.0.1:12381,http://127.0.0.1:12382"
defaultInitialClusterState = embed.ClusterStateFlagNew
defaultInitialClusterToken = "pd-cluster"
......@@ -131,7 +131,7 @@ const (
defaultHeartbeatStreamRebindInterval = time.Minute
defaultMaxResetTSGap = 24 * time.Hour
defaultMaxResetTSGap = 24 * time.Hour
defaultEnableGRPCGateway = true
defaultDisableErrorVerbose = true
)
......@@ -344,7 +344,6 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
adjustDuration(&c.TickInterval, defaultTickInterval)
adjustDuration(&c.ElectionInterval, defaultElectionInterval)
if err := c.PDServerCfg.adjust(configMetaData.Child("pd-server")); err != nil {
return err
}
......@@ -389,7 +388,6 @@ func (c *Config) configFromFile(path string) (*toml.MetaData, error) {
return &meta, errors.WithStack(err)
}
// PDServerConfig is the configuration for pd server.
type PDServerConfig struct {
// MaxResetTSGap is the max gap to reset the tso.
......@@ -404,11 +402,10 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
// Clone returns a cloned PD server config.
func (c *PDServerConfig) Clone() *PDServerConfig {
return &PDServerConfig{
MaxResetTSGap: c.MaxResetTSGap,
MaxResetTSGap: c.MaxResetTSGap,
}
}
// ParseUrls parse a string into multiple urls.
// Export for api.
func ParseUrls(s string) ([]url.URL, error) {
......@@ -507,12 +504,11 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) {
cfg.PeerTLSInfo.CertFile = c.Security.CertPath
cfg.PeerTLSInfo.KeyFile = c.Security.KeyPath
cfg.PeerTLSInfo.AllowedCN = allowedCN
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(c.logger, c.logger.Core(), c.logProps.Syncer)
// cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(c.logger, c.logger.Core(), c.logProps.Syncer)
cfg.EnableGRPCGateway = c.EnableGRPCGateway
cfg.EnableV2 = true
cfg.Logger = "zap"
var err error
cfg.LPUrls, err = ParseUrls(c.PeerUrls)
if err != nil {
return nil, err
......
......@@ -20,11 +20,9 @@ import (
//"fmt"
"math/rand"
"path"
"github.com/czs007/suvlim/master/member"
"strconv"
"strings"
"sync"
"sync/atomic"
......@@ -40,13 +38,14 @@ import (
"github.com/czs007/suvlim/master/config"
"github.com/czs007/suvlim/master/id"
"github.com/czs007/suvlim/master/meta"
"github.com/czs007/suvlim/util/typeutil"
"github.com/czs007/suvlim/util/logutil"
"github.com/czs007/suvlim/util/typeutil"
//"github.com/czs007/suvlim/master/kv"
"github.com/czs007/suvlim/master/tso"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
//"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
"google.golang.org/grpc"
......@@ -125,7 +124,6 @@ type Server struct {
func CreateServer(ctx context.Context, cfg *config.Config) (*Server, error) {
log.Info("PD Config", zap.Reflect("config", cfg))
rand.Seed(time.Now().UnixNano())
s := &Server{
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
......@@ -141,7 +139,6 @@ func CreateServer(ctx context.Context, cfg *config.Config) (*Server, error) {
if err != nil {
return nil, err
}
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
pdpb.RegisterPDServer(gs, s)
//diagnosticspb.RegisterDiagnosticsServer(gs, s)
......@@ -150,7 +147,6 @@ func CreateServer(ctx context.Context, cfg *config.Config) (*Server, error) {
s.lg = cfg.GetZapLogger()
s.logProps = cfg.GetZapLogProperties()
s.lg = cfg.GetZapLogger()
s.logProps = cfg.GetZapLogProperties()
return s, nil
......@@ -178,7 +174,6 @@ func (s *Server) startEtcd(ctx context.Context) error {
//if err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlMap, tlsConfig); err != nil {
// return err
//}
select {
// Wait etcd until it is ready to use
case <-etcd.Server.ReadyNotify():
......@@ -233,17 +228,6 @@ func (s *Server) startServer(ctx context.Context) error {
// It may lose accuracy if use float64 to store uint64. So we store the
// cluster id in label.
//metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0)
s.rootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.GetLeadership(),
s.rootPath,
s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
// path := filepath.Join(s.cfg.DataDir, "region-meta")
......@@ -335,7 +319,6 @@ func (s *Server) Run() error {
if err := s.startEtcd(s.ctx); err != nil {
return err
}
if err := s.startServer(s.ctx); err != nil {
return err
}
......
# How to start a master
## Requirements
## Start from code
## Start with docker
package common
import "time"
const (
PULSAR_URL = "pulsar://localhost:16650"
PULSAR_MONITER_INTERVAL = 1 * time.Second
PULSAR_TOPIC = "monitor-topic"
ETCD_ROOT_PATH = "by-dev"
)
package informer
type Informer interface {
Listener(key interface{}) (interface{}, error)
}
package informer
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/pkg/master/common"
"github.com/czs007/suvlim/pkg/master/mock"
)
func NewPulsarClient() PulsarClient {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: common.PULSAR_URL,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
return PulsarClient{
Client: client,
}
}
type PulsarClient struct {
Client pulsar.Client
}
func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: common.PULSAR_TOPIC,
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.TODO())
if err != nil {
log.Fatal(err)
}
m, _ := mock.SegmentUnMarshal(msg.Payload())
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), m.SegementID)
ssChan <- m
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
return nil
}
package kv
import (
"context"
"path"
"time"
"github.com/czs007/suvlim/errors"
"github.com/czs007/suvlim/util/etcdutil"
"github.com/pingcap/log"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
const (
requestTimeout = 10 * time.Second
slowRequestTime = 1 * time.Second
)
var (
errTxnFailed = errors.New("failed to commit transaction")
)
type etcdKVBase struct {
client *clientv3.Client
rootPath string
}
// NewEtcdKVBase creates a new etcd kv.
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
return &etcdKVBase{
client: client,
rootPath: rootPath,
}
}
func (kv *etcdKVBase) Load(key string) (string, error) {
key = path.Join(kv.rootPath, key)
resp, err := etcdutil.EtcdKVGet(kv.client, key)
if err != nil {
return "", err
}
if n := len(resp.Kvs); n == 0 {
return "", nil
} else if n > 1 {
return "", errors.Errorf("load more than one kvs: key %v kvs %v", key, n)
}
return string(resp.Kvs[0].Value), nil
}
func (kv *etcdKVBase) Save(key, value string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpPut(key, value)).Commit()
if err != nil {
log.Error("save to etcd meet error", zap.String("key", key), zap.String("value", value))
return errors.WithStack(err)
}
if !resp.Succeeded {
return errors.WithStack(errTxnFailed)
}
return nil
}
func (kv *etcdKVBase) Remove(key string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpDelete(key)).Commit()
if err != nil {
log.Error("remove from etcd meet error", zap.String("key", key))
return errors.WithStack(err)
}
if !resp.Succeeded {
return errors.WithStack(errTxnFailed)
}
return nil
}
// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
cancel context.CancelFunc
}
// NewSlowLogTxn create a SlowLogTxn.
func NewSlowLogTxn(client *clientv3.Client) clientv3.Txn {
ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout)
return &SlowLogTxn{
Txn: client.Txn(ctx),
cancel: cancel,
}
}
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
func (t *SlowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
return &SlowLogTxn{
Txn: t.Txn.If(cs...),
cancel: t.cancel,
}
}
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
func (t *SlowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn {
return &SlowLogTxn{
Txn: t.Txn.Then(ops...),
cancel: t.cancel,
}
}
// Commit implements Txn Commit interface.
func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) {
start := time.Now()
resp, err := t.Txn.Commit()
t.cancel()
cost := time.Since(start)
if cost > slowRequestTime {
log.Warn("txn runs too slow",
zap.Error(err),
zap.Reflect("response", resp),
zap.Duration("cost", cost))
}
//label := "success"
//if err != nil {
// label = "failed"
//}
//txnCounter.WithLabelValues(label).Inc()
//txnDuration.WithLabelValues(label).Observe(cost.Seconds())
return resp, errors.WithStack(err)
}
package kv
type Base interface {
Load(key string) (string, error)
Save(key, value string) error
Remove(key string) error
}
package mock
import (
"time"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Collection struct {
ID uint64 `json:"id"`
Name string `json:"name"`
CreateTime time.Time `json:"creat_time"`
SegmentIDs []uint64 `json:"segment_ids"`
PartitionTags []string `json:"partition_tags"`
}
func FakeCreateCollection(id uint64) Collection {
cl := Collection{
ID: id,
Name: "test-collection",
CreateTime: time.Now(),
SegmentIDs: []uint64{uint64(10111)},
PartitionTags: []string{"default"},
}
return cl
}
func Collection2JSON(c Collection) (string, error) {
b, err := json.Marshal(&c)
if err != nil {
return "", err
}
return string(b), nil
}
func JSON2Collection(s string) (Collection, error) {
var c Collection
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return Collection{}, err
}
return c, nil
}
package mock
import (
"fmt"
"testing"
"time"
)
var C = Collection{
ID: uint64(11111),
Name: "test-collection",
CreateTime: time.Now(),
SegmentIDs: []uint64{uint64(10111)},
PartitionTags: []string{"default"},
}
func TestCollection2JSON(t *testing.T) {
res, err := Collection2JSON(C)
if err != nil {
t.Error(err)
}
fmt.Println(res)
}
package mock
func MockServer() {
}
package mock
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/czs007/suvlim/pkg/master/common"
)
func FakePulsarProducer() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: common.PULSAR_URL,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: common.PULSAR_TOPIC,
})
testSegmentStats, _ := SegmentMarshal(SegmentStats{
SegementID: uint64(1111),
MemorySize: uint64(333322),
MemoryRate: float64(0.13),
})
for {
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: testSegmentStats,
})
time.Sleep(1 * time.Second)
}
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
}
//TODO add a mock: memory increase
package mock
import (
"bytes"
"encoding/gob"
"time"
)
type SegmentStats struct {
SegementID uint64
MemorySize uint64
MemoryRate float64
}
func SegmentMarshal(s SegmentStats) ([]byte, error) {
var nb bytes.Buffer
enc := gob.NewEncoder(&nb)
err := enc.Encode(s)
if err != nil {
return []byte{}, err
}
return nb.Bytes(), nil
}
func SegmentUnMarshal(data []byte) (SegmentStats, error) {
var ss SegmentStats
dec := gob.NewDecoder(bytes.NewBuffer(data))
err := dec.Decode(&ss)
if err != nil {
return SegmentStats{}, err
}
return ss, nil
}
type Segment struct {
SegmentID uint64 `json:"segment_id"`
Collection Collection `json:"collection"`
PartitionTag string `json:"partition_tag"`
ChannelStart int `json:"channel_start"`
ChannelEnd int `json:"channel_end"`
OpenTimeStamp time.Time `json:"open_timestamp"`
CloseTimeStamp time.Time `json:"clost_timestamp"`
}
func Segment2JSON(s Segment) (string, error) {
b, err := json.Marshal(&s)
if err != nil {
return "", err
}
return string(b), nil
}
func JSON2Segment(s string) (Segment, error) {
var c Segment
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return Segment{}, err
}
return c, nil
}
func FakeCreateSegment(id uint64, cl Collection, opentime time.Time, closetime time.Time) Segment {
seg := Segment{
SegmentID: id,
Collection: cl,
PartitionTag: "default",
ChannelStart: 0,
ChannelEnd: 100,
OpenTimeStamp: opentime,
CloseTimeStamp: closetime,
}
return seg
}
package mock
import (
"fmt"
"testing"
"time"
)
func TestSegmentMarshal(t *testing.T) {
s := SegmentStats{
SegementID: uint64(12315),
MemorySize: uint64(233113),
MemoryRate: float64(0.13),
}
data, err := SegmentMarshal(s)
if err != nil {
t.Error(err)
}
ss, err := SegmentUnMarshal(data)
if err != nil {
t.Error(err)
}
if ss.MemoryRate != s.MemoryRate {
fmt.Println(ss.MemoryRate)
fmt.Println(s.MemoryRate)
t.Error("Error when marshal")
}
}
var Ts = Segment{
SegmentID: uint64(101111),
Collection: Collection{
ID: uint64(11111),
Name: "test-collection",
CreateTime: time.Now(),
SegmentIDs: []uint64{uint64(10111)},
PartitionTags: []string{"default"},
},
PartitionTag: "default",
ChannelStart: 1,
ChannelEnd: 100,
OpenTimeStamp: time.Now(),
CloseTimeStamp: time.Now().Add(1 * time.Hour),
}
func TestSegment2JSON(t *testing.T) {
res, err := Segment2JSON(Ts)
if err != nil {
t.Error(err)
}
fmt.Println(res)
}
package master
import (
"fmt"
"time"
"github.com/czs007/suvlim/pkg/master/common"
"github.com/czs007/suvlim/pkg/master/informer"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/mock"
"go.etcd.io/etcd/clientv3"
)
func SegmentStatsController() {
ssChan := make(chan mock.SegmentStats, 10)
defer close(ssChan)
ssClient := informer.NewPulsarClient()
go ssClient.Listener(ssChan)
for {
select {
case ss := <-ssChan:
fmt.Println(ss)
case <-time.After(5 * time.Second):
fmt.Println("timeout")
return
}
}
}
func GRPCServer() {
}
func CollectionController() {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:12379"},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH)
c := mock.FakeCreateCollection(uint64(3333))
s := mock.FakeCreateSegment(uint64(11111), c, time.Now(), time.Unix(1<<63-1, 0))
collectionData, _ := mock.Collection2JSON(c)
segmentData, _ := mock.Segment2JSON(s)
kvbase.Save("test-collection", collectionData)
kvbase.Save("test-segment", segmentData)
fmt.Println(kvbase.Load("test-collection"))
fmt.Println(kvbase.Load("test-segment"))
}
func Sync() {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册