提交 544c701f 编写于 作者: N neza2017 提交者: yefu.chen

Start timeSyncMsgProducer in master

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 3ae7cd59
......@@ -20,8 +20,25 @@ func main() {
etcdAddress, _ := masterParams.Params.EtcdAddress()
etcdRootPath, _ := masterParams.Params.EtcdRootPath()
pulsarAddr, _ := masterParams.Params.PulsarAddress()
svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddress})
opt := master.Option{
KVRootPath: etcdRootPath,
MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr,
ProxyIDs: nil,
PulsarProxyChannels: nil,
PulsarProxySubName: "",
SoftTTBInterval: 0,
WriteIDs: nil,
PulsarWriteChannels: nil,
PulsarWriteSubName: "",
PulsarDMChannels: nil,
PulsarK2SChannels: nil,
}
svr, err := master.CreateServer(ctx, &opt)
if err != nil {
log.Print("create server failed", zap.Error(err))
}
......
......@@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
......@@ -30,7 +31,23 @@ func TestMaster_CollectionTask(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
}
svr, err := CreateServer(ctx, &opt)
assert.Nil(t, err)
err = svr.Run(10002)
assert.Nil(t, err)
......
......@@ -11,6 +11,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
......@@ -30,7 +31,23 @@ func TestMaster_CreateCollection(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
}
svr, err := CreateServer(ctx, &opt)
assert.Nil(t, err)
err = svr.Run(10001)
assert.Nil(t, err)
......
......@@ -22,12 +22,38 @@ import (
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable"
"github.com/zilliztech/milvus-distributed/internal/master/timesync"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
// Server is the pd server.
type Option struct {
KVRootPath string
MetaRootPath string
EtcdAddr []string
PulsarAddr string
////softTimeTickBarrier
ProxyIDs []typeutil.UniqueID
PulsarProxyChannels []string //TimeTick
PulsarProxySubName string
SoftTTBInterval Timestamp //Physical Time + Logical Time
//hardTimeTickBarrier
WriteIDs []typeutil.UniqueID
PulsarWriteChannels []string
PulsarWriteSubName string
PulsarDMChannels []string
PulsarK2SChannels []string
}
type Master struct {
// Server state.
isServing int64
......@@ -54,6 +80,7 @@ type Master struct {
kvBase *kv.EtcdKV
scheduler *ddRequestScheduler
mt *metaTable
tsmp timesync.MsgProducer
// tso ticker
tsTicker *time.Ticker
......@@ -89,25 +116,57 @@ func Init() {
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) {
func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
//Init(etcdAddr, kvRootPath)
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr})
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: opt.EtcdAddr})
if err != nil {
return nil, err
}
etcdkv := kv.NewEtcdKV(etcdClient, metaRootPath)
etcdkv := kv.NewEtcdKV(etcdClient, opt.MetaRootPath)
metakv, err := NewMetaTable(etcdkv)
if err != nil {
return nil, err
}
//timeSyncMsgProducer
tsmp, err := timesync.NewTimeSyncMsgProducer(ctx)
if err != nil {
return nil, err
}
pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyStream.SetPulsarCient(opt.PulsarAddr)
pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.Start()
var proxyStream ms.MsgStream = pulsarProxyStream
proxyTimeTickBarrier := timesync.NewSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval)
tsmp.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarCient(opt.PulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
writeTimeTickBarrier := timesync.NewHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs)
tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarCient(opt.PulsarAddr)
pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels)
tsmp.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarCient(opt.PulsarAddr)
pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels)
tsmp.SetK2sSyncStream(pulsarK2SStream)
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKVBase(kvRootPath, etcdAddr),
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
scheduler: NewDDRequestScheduler(),
mt: metakv,
tsmp: tsmp,
ssChan: make(chan internalpb.SegmentStats, 10),
grpcErr: make(chan error),
pc: informer.NewPulsarClient(),
......@@ -189,6 +248,11 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
//go s.Se
s.serverLoopWg.Add(1)
if err := s.tsmp.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
go s.grpcLoop(grpcPort)
......@@ -209,6 +273,9 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
}
func (s *Master) stopServerLoop() {
s.tsmp.Close()
s.serverLoopWg.Done()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
log.Printf("server is closed, exit grpc server")
......
......@@ -14,6 +14,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
......@@ -34,8 +35,24 @@ func TestMaster_Partition(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
}
port := 10000 + rand.Intn(1000)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
svr, err := CreateServer(ctx, &opt)
assert.Nil(t, err)
err = svr.Run(int64(port))
assert.Nil(t, err)
......
package timesync
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
import (
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
)
type MsgProducer interface {
SetProxyTtBarrier(proxyTtBarrier TimeTickBarrier)
SetWriteNodeTtBarrier(writeNodeTtBarrier TimeTickBarrier)
SetDMSyncStream(dmSync ms.MsgStream)
SetK2sSyncStream(k2sSync ms.MsgStream)
Start() error
Close()
}
type TimeTickBarrier interface {
GetTimeTick() (Timestamp, error)
Start() error
......
......@@ -21,6 +21,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
var ctx context.Context
......@@ -48,7 +49,23 @@ func startMaster(ctx context.Context) {
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr})
opt := master.Option{
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
}
svr, err := master.CreateServer(ctx, &opt)
masterServer = svr
if err != nil {
log.Print("create server failed", zap.Error(err))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册