From 5918c791e785f4ac111512533b65f223ef38e723 Mon Sep 17 00:00:00 2001 From: JJW <382689420@qq.com> Date: Mon, 1 Mar 2021 15:48:29 +0800 Subject: [PATCH] feat(core): merge multiple streams into one stream (#134) * feat(core): Combine multiple streams into one stream Co-authored-by: jjwygjj --- example/flow/app.go | 2 +- example/trainingmodel/flow/app.go | 41 +++ example/trainingmodel/source-data-a/main.go | 59 ++++ example/trainingmodel/source-data-b/main.go | 62 +++++ example/trainingmodel/zipper/workflow.yaml | 7 + go.mod | 4 +- go.sum | 9 +- internal/cmd/dev.go | 25 +- internal/cmd/run.go | 39 ++- internal/cmd/wf/dev.go | 7 +- internal/cmd/wf/run.go | 9 +- internal/workflow/runtime.go | 156 ++++++++--- pkg/quic/quic-go.go | 31 ++- pkg/quic/stream.go | 16 +- pkg/rx/rxstream_operator.go | 106 ++++++- pkg/yy3/yy3.go | 290 ++++++++++++++++++++ 16 files changed, 758 insertions(+), 105 deletions(-) create mode 100644 example/trainingmodel/flow/app.go create mode 100644 example/trainingmodel/source-data-a/main.go create mode 100644 example/trainingmodel/source-data-b/main.go create mode 100644 example/trainingmodel/zipper/workflow.yaml create mode 100644 pkg/yy3/yy3.go diff --git a/example/flow/app.go b/example/flow/app.go index 1e88db8..1c9922a 100644 --- a/example/flow/app.go +++ b/example/flow/app.go @@ -23,7 +23,7 @@ type NoiseData struct { var printer = func(_ context.Context, i interface{}) (interface{}, error) { value := i.(NoiseData) rightNow := time.Now().UnixNano() / int64(time.Millisecond) - fmt.Printf("[%s] %d > value: %f ⚡️=%dms\n", value.From, value.Time, value.Noise, rightNow-value.Time) + fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time)) return i, nil } diff --git a/example/trainingmodel/flow/app.go b/example/trainingmodel/flow/app.go new file mode 100644 index 0000000..332e977 --- /dev/null +++ b/example/trainingmodel/flow/app.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "fmt" + + y3 "github.com/yomorun/y3-codec-golang" + "github.com/yomorun/yomo/pkg/rx" +) + +const DataAKey = 0x11 +const DataBKey = 0x12 + +var callback = func(v []byte) (interface{}, error) { + return y3.ToFloat32(v) +} + +var printera = func(_ context.Context, i interface{}) (interface{}, error) { + value := i.(float32) + fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value)) + return i, nil +} + +var printerb = func(_ context.Context, i interface{}) (interface{}, error) { + value := i.(float32) + fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value)) + return i, nil +} + +var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) { + + return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil +} + +func Handler(rxstream rx.RxStream) rx.RxStream { + streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera) + streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb) + + stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10) + return stream +} diff --git a/example/trainingmodel/source-data-a/main.go b/example/trainingmodel/source-data-a/main.go new file mode 100644 index 0000000..10e32d0 --- /dev/null +++ b/example/trainingmodel/source-data-a/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "log" + "math/rand" + "os" + "time" + + y3 "github.com/yomorun/y3-codec-golang" + "github.com/yomorun/yomo/pkg/quic" +) + +var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT") + +func main() { + if zipperAddr == "" { + zipperAddr = "localhost:9999" + } + err := emit(zipperAddr) + if err != nil { + log.Printf("❌ Emit the data to yomo-zipper %s failure with err: %v", zipperAddr, err) + } +} + +func emit(addr string) error { + client, err := quic.NewClient(addr) + if err != nil { + return err + } + log.Printf("✅ Connected to yomo-zipper %s", addr) + + stream, err := client.CreateStream(context.Background()) + if err != nil { + return err + } + + generateAndSendData(stream) + + return nil +} + +var codec = y3.NewCodec(0x11) + +func generateAndSendData(stream quic.Stream) { + for { + time.Sleep(100 * time.Millisecond) + num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200 + + sendingBuf, _ := codec.Marshal(num) + + _, err := stream.Write(sendingBuf) + if err != nil { + log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err) + } else { + log.Printf("✅ Emit %v to yomo-zipper", num) + } + } +} diff --git a/example/trainingmodel/source-data-b/main.go b/example/trainingmodel/source-data-b/main.go new file mode 100644 index 0000000..c8dc40d --- /dev/null +++ b/example/trainingmodel/source-data-b/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "log" + "math/rand" + "os" + "time" + + y3 "github.com/yomorun/y3-codec-golang" + "github.com/yomorun/yomo/pkg/quic" +) + +var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT") + +func main() { + if zipperAddr == "" { + zipperAddr = "localhost:9999" + } + err := emit(zipperAddr) + if err != nil { + log.Printf("❌ Emit the data to yomo-zipper %s failure with err: %v", zipperAddr, err) + } +} + +func emit(addr string) error { + + client, err := quic.NewClient(addr) + if err != nil { + return err + } + log.Printf("✅ Connected to yomo-zipper %s", addr) + + stream, err := client.CreateStream(context.Background()) + if err != nil { + return err + } + + generateAndSendData(stream) + + return nil +} + +var codec = y3.NewCodec(0x12) + +func generateAndSendData(stream quic.Stream) { + + for { + time.Sleep(100 * time.Millisecond) + + num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 2000 + + sendingBuf, _ := codec.Marshal(num) + + _, err := stream.Write(sendingBuf) + if err != nil { + log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err) + } else { + log.Printf("✅ Emit %v to yomo-zipper", num) + } + } +} diff --git a/example/trainingmodel/zipper/workflow.yaml b/example/trainingmodel/zipper/workflow.yaml new file mode 100644 index 0000000..9407277 --- /dev/null +++ b/example/trainingmodel/zipper/workflow.yaml @@ -0,0 +1,7 @@ +name: Service +host: localhost +port: 9999 +flows: + - name: training + host: localhost + port: 4242 \ No newline at end of file diff --git a/go.mod b/go.mod index 64c763f..b577b18 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/cenkalti/backoff/v4 v4.1.0 github.com/lucas-clemente/quic-go v0.19.3 github.com/reactivex/rxgo/v2 v2.4.0 - github.com/spf13/cobra v1.1.3 - github.com/yomorun/y3-codec-golang v1.6.4 + github.com/spf13/cobra v1.1.1 + github.com/yomorun/y3-codec-golang v1.6.6 golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f // indirect gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index b68be34..aa2a413 100644 --- a/go.sum +++ b/go.sum @@ -257,6 +257,8 @@ github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4= +github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI= github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M= github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= @@ -280,10 +282,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1 github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/yomorun/y3-codec-golang v1.6.3 h1:7JTqWbdeKp9ZeIlIoZTFaMC8/PwQ2HKFVOCbxAEY2B0= -github.com/yomorun/y3-codec-golang v1.6.3/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA= -github.com/yomorun/y3-codec-golang v1.6.4 h1:2XZBafqNXvHp8PoVrEE3r4WW/nOGJZg/d2Hh0A0WBuk= -github.com/yomorun/y3-codec-golang v1.6.4/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA= +github.com/yomorun/y3-codec-golang v1.6.6 h1:Cmqag2WYTyt3GcuuDJRVayFTniWGHE+7r68G/LNOMqk= +github.com/yomorun/y3-codec-golang v1.6.6/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -473,6 +473,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/internal/cmd/dev.go b/internal/cmd/dev.go index 193f890..3f7a0c7 100644 --- a/internal/cmd/dev.go +++ b/internal/cmd/dev.go @@ -1,7 +1,9 @@ package cmd import ( + "context" "fmt" + "io" "log" "plugin" @@ -39,6 +41,7 @@ func NewCmdDev() *cobra.Command { quicHandler := &quicDevHandler{ serverlessHandle: slHandler, serverAddr: fmt.Sprintf("localhost:%d", opts.Port), + readers: make(chan io.Reader), } err = serverless.Run(endpoint, quicHandler) @@ -57,16 +60,16 @@ func NewCmdDev() *cobra.Command { type quicDevHandler struct { serverlessHandle plugin.Symbol serverAddr string + readers chan io.Reader } -func (s quicDevHandler) Listen() error { +func (s *quicDevHandler) Listen() error { err := mocker.EmitMockDataFromCloud(s.serverAddr) - return err -} - -func (s quicDevHandler) Read(st quic.Stream) error { - stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st)) - + if err != nil { + return err + } + rxstream := rx.FromReaderWithY3(s.readers) + stream := dispatcher.Dispatcher(s.serverlessHandle, rxstream) go func() { for customer := range stream.Observe() { if customer.Error() { @@ -74,5 +77,13 @@ func (s quicDevHandler) Read(st quic.Stream) error { } } }() + + rxstream.Connect(context.Background()) + + return nil +} + +func (s *quicDevHandler) Read(st quic.Stream) error { + s.readers <- st return nil } diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 50e9502..2f17129 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -1,8 +1,11 @@ package cmd import ( + "context" "fmt" + "io" "log" + "math/rand" "os" "plugin" @@ -44,6 +47,8 @@ func NewCmdRun() *cobra.Command { endpoint := fmt.Sprintf("0.0.0.0:%d", opts.Port) quicHandler := &quicServerHandler{ serverlessHandle: slHandler, + readers: make(chan io.Reader), + writers: make([]io.Writer, 0), } err = serverless.Run(endpoint, quicHandler) @@ -61,23 +66,45 @@ func NewCmdRun() *cobra.Command { type quicServerHandler struct { serverlessHandle plugin.Symbol + readers chan io.Reader + writers []io.Writer } -func (s quicServerHandler) Listen() error { - return nil -} +func (s *quicServerHandler) Listen() error { + rxstream := rx.FromReaderWithY3(s.readers) -func (s quicServerHandler) Read(st quic.Stream) error { - stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st)) + stream := dispatcher.Dispatcher(s.serverlessHandle, rxstream) + rxstream.Connect(context.Background()) go func() { for customer := range stream.Observe() { if customer.Error() { fmt.Println(customer.E.Error()) } else if customer.V != nil { - st.Write((customer.V).([]byte)) + index := rand.Intn(len(s.writers)) + + loop: + for i, w := range s.writers { + if index == i { + _, err := w.Write((customer.V).([]byte)) + if err != nil { + index = rand.Intn(len(s.writers)) + break loop + } + } else { + w.Write([]byte{0}) + } + } + } } }() + + return nil +} + +func (s *quicServerHandler) Read(st quic.Stream) error { + s.readers <- st + s.writers = append(s.writers, st) return nil } diff --git a/internal/cmd/wf/dev.go b/internal/cmd/wf/dev.go index 0abf2f9..7b2b68b 100644 --- a/internal/cmd/wf/dev.go +++ b/internal/cmd/wf/dev.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "time" "github.com/spf13/cobra" "github.com/yomorun/yomo/internal/conf" @@ -68,8 +69,9 @@ func (s *quicDevHandler) Listen() error { } func (s *quicDevHandler) Read(st quic.Stream) error { + id := time.Now().UnixNano() - flows, sinks := workflow.Build(s.serverlessConfig) + flows, sinks := workflow.Build(s.serverlessConfig, id) stream := dispatcher.DispatcherWithFunc(flows, st) @@ -82,6 +84,9 @@ func (s *quicDevHandler) Read(st quic.Stream) error { value := customer.V.([]byte) + if len(value) == 1 && value[0] == byte(0) { + continue + } for _, sink := range sinks { go func(_sink func() (io.Writer, func()), buf []byte) { writer, cancel := _sink() diff --git a/internal/cmd/wf/run.go b/internal/cmd/wf/run.go index 12252b9..571c679 100644 --- a/internal/cmd/wf/run.go +++ b/internal/cmd/wf/run.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "time" "github.com/spf13/cobra" "github.com/yomorun/yomo/internal/conf" @@ -65,7 +66,8 @@ func (s *quicHandler) Listen() error { } func (s *quicHandler) Read(st quic.Stream) error { - flows, sinks := workflow.Build(s.serverlessConfig) + id := time.Now().UnixNano() + flows, sinks := workflow.Build(s.serverlessConfig, id) stream := dispatcher.DispatcherWithFunc(flows, st) @@ -76,16 +78,19 @@ func (s *quicHandler) Read(st quic.Stream) error { continue } value := customer.V.([]byte) + if len(value) == 1 && value[0] == byte(0) { + continue + } for _, sink := range sinks { go func(_sink func() (io.Writer, func()), buf []byte) { writer, cancel := _sink() - if writer != nil { _, err := writer.Write(buf) if err != nil { cancel() } + } }(sink, value) } diff --git a/internal/workflow/runtime.go b/internal/workflow/runtime.go index ab082d9..8807141 100644 --- a/internal/workflow/runtime.go +++ b/internal/workflow/runtime.go @@ -5,21 +5,31 @@ import ( "fmt" "io" "log" + "sync" "github.com/yomorun/yomo/internal/conf" "github.com/yomorun/yomo/pkg/quic" ) -var Clients map[string]Client +var FlowClients map[string]Client +var SinkClients map[string]Client +var flowmutex sync.RWMutex +var sinkmutex sync.RWMutex type Client struct { App conf.App - Stream io.ReadWriter + StreamMap map[int64]Stream + QuicClient quic.Client +} + +type Stream struct { + St io.ReadWriter CancelFunc context.CancelFunc } func init() { - Clients = make(map[string]Client) + FlowClients = make(map[string]Client) + SinkClients = make(map[string]Client) } // Run runs quic service @@ -30,29 +40,33 @@ func Run(endpoint string, handle quic.ServerHandler) error { } // Build build the workflow by config (.yaml). -func Build(wfConf *conf.WorkflowConfig) ([]func() (io.ReadWriter, func()), []func() (io.Writer, func())) { +func Build(wfConf *conf.WorkflowConfig, id int64) ([]func() (io.ReadWriter, func()), []func() (io.Writer, func())) { //init workflow flows := make([]func() (io.ReadWriter, func()), 0) sinks := make([]func() (io.Writer, func()), 0) for _, app := range wfConf.Flows { - flows = append(flows, createReadWriter(app)) + flows = append(flows, createReadWriter(app, id)) } for _, app := range wfConf.Sinks { - sinks = append(sinks, createWriter(app)) + sinks = append(sinks, createWriter(app, 0)) } return flows, sinks } -func connectToApp(ctx context.Context, app conf.App) (quic.Stream, error) { +func connectToApp(app conf.App) (quic.Client, error) { client, err := quic.NewClient(fmt.Sprintf("%s:%d", app.Host, app.Port)) if err != nil { log.Print(getConnectFailedMsg(app), err) return nil, err } log.Printf("✅ Connect to %s successfully.", getAppInfo(app)) + return client, err +} + +func createStream(ctx context.Context, client quic.Client) (quic.Stream, error) { return client.CreateStream(ctx) } @@ -68,71 +82,125 @@ func getAppInfo(app conf.App) string { app.Port) } -func createReadWriter(app conf.App) func() (io.ReadWriter, func()) { +func createReadWriter(app conf.App, id int64) func() (io.ReadWriter, func()) { f := func() (io.ReadWriter, func()) { - if Clients[app.Name].Stream != nil { - return Clients[app.Name].Stream, Clients[app.Name].CancelFunc + flowmutex.Lock() + if len(FlowClients[app.Name].StreamMap) > 0 && FlowClients[app.Name].StreamMap[id].St != nil { + flowmutex.Unlock() + return FlowClients[app.Name].StreamMap[id].St, FlowClients[app.Name].StreamMap[id].CancelFunc } - ctx, cancel := context.WithCancel(context.Background()) - stream, err := connectToApp(ctx, app) - if err != nil { - Clients[app.Name] = Client{ + if FlowClients[app.Name].StreamMap == nil || (FlowClients[app.Name].StreamMap != nil && FlowClients[app.Name].QuicClient == nil) { + client, err := connectToApp(app) + + if err != nil { + flowmutex.Unlock() + return nil, nil + } + streammap := make(map[int64]Stream) + FlowClients[app.Name] = Client{ App: app, - Stream: nil, - CancelFunc: cancelStream(cancel, app), + StreamMap: streammap, + QuicClient: client, } - return nil, cancelStream(cancel, app) } - Clients[app.Name] = Client{ - App: app, - Stream: stream, - CancelFunc: cancelStream(cancel, app), + ctx, cancel := context.WithCancel(context.Background()) + stream, err := createStream(ctx, FlowClients[app.Name].QuicClient) + if err != nil { + if err.Error() == "NO_ERROR: No recent network activity" { + FlowClients[app.Name] = Client{ + App: app, + StreamMap: nil, + QuicClient: nil, + } + } + flowmutex.Unlock() + return nil, cancelFlowStream(cancel, app, id) } - - return stream, cancelStream(cancel, app) + FlowClients[app.Name].StreamMap[id] = Stream{ + St: stream, + CancelFunc: cancelFlowStream(cancel, app, id), + } + flowmutex.Unlock() + return stream, cancelFlowStream(cancel, app, id) } return f } -func createWriter(app conf.App) func() (io.Writer, func()) { +func createWriter(app conf.App, id int64) func() (io.Writer, func()) { + f := func() (io.Writer, func()) { - if Clients[app.Name].Stream != nil { - return Clients[app.Name].Stream, Clients[app.Name].CancelFunc + sinkmutex.Lock() + if len(SinkClients[app.Name].StreamMap) > 0 && SinkClients[app.Name].StreamMap[id].St != nil { + sinkmutex.Unlock() + return SinkClients[app.Name].StreamMap[id].St, SinkClients[app.Name].StreamMap[id].CancelFunc } - ctx, cancel := context.WithCancel(context.Background()) - stream, err := connectToApp(ctx, app) - if err != nil { - Clients[app.Name] = Client{ + if SinkClients[app.Name].StreamMap == nil || (SinkClients[app.Name].StreamMap != nil && SinkClients[app.Name].QuicClient == nil) { + client, err := connectToApp(app) + + if err != nil { + sinkmutex.Unlock() + return nil, nil + } + + streammap := make(map[int64]Stream) + SinkClients[app.Name] = Client{ App: app, - Stream: nil, - CancelFunc: cancelStream(cancel, app), + StreamMap: streammap, + QuicClient: client, } - return nil, cancelStream(cancel, app) - } - Clients[app.Name] = Client{ - App: app, - Stream: stream, - CancelFunc: cancelStream(cancel, app), } - return stream, cancelStream(cancel, app) + ctx, cancel := context.WithCancel(context.Background()) + stream, err := createStream(ctx, SinkClients[app.Name].QuicClient) + if err != nil { + if err.Error() == "NO_ERROR: No recent network activity" { + SinkClients[app.Name] = Client{ + App: app, + StreamMap: nil, + QuicClient: nil, + } + } + sinkmutex.Unlock() + return nil, cancelSinkStream(cancel, app, id) + } + SinkClients[app.Name].StreamMap[id] = Stream{ + St: stream, + CancelFunc: cancelSinkStream(cancel, app, id), + } + sinkmutex.Unlock() + return stream, cancelSinkStream(cancel, app, id) } return f } -func cancelStream(cancel context.CancelFunc, app conf.App) func() { +func cancelFlowStream(cancel context.CancelFunc, app conf.App, id int64) func() { + f := func() { + flowmutex.Lock() + if FlowClients[app.Name].StreamMap != nil { + stream := FlowClients[app.Name].StreamMap[id] + stream.St = nil + FlowClients[app.Name].StreamMap[id] = stream + } + flowmutex.Unlock() + } + return f +} + +func cancelSinkStream(cancel context.CancelFunc, app conf.App, id int64) func() { f := func() { - cancel() - Clients[app.Name] = Client{ - App: app, - Stream: nil, + sinkmutex.Lock() + if SinkClients[app.Name].StreamMap != nil { + stream := SinkClients[app.Name].StreamMap[id] + stream.St = nil + SinkClients[app.Name].StreamMap[id] = stream } + sinkmutex.Unlock() } return f } diff --git a/pkg/quic/quic-go.go b/pkg/quic/quic-go.go index 125fb21..f0dbd8d 100644 --- a/pkg/quic/quic-go.go +++ b/pkg/quic/quic-go.go @@ -51,20 +51,29 @@ func (s *quicGoServer) ListenAndServe(ctx context.Context, addr string) error { } for { - session, err := listener.Accept(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + session, err := listener.Accept(ctx) if err != nil { + cancel() return err } - stream, err := session.AcceptStream(context.Background()) - if err != nil { - return err - } - if s.handler != nil { - s.handler.Read(stream) - } else { - log.Print("handler isn't set in QUIC server") - } + go func(session quicGo.Session, cancel context.CancelFunc) { + defer cancel() + + for { + stream, err := session.AcceptStream(context.Background()) + if err != nil { + break + } + if s.handler != nil { + s.handler.Read(stream) + } else { + log.Print("handler isn't set in QUIC server") + break + } + } + }(session, cancel) } } @@ -79,7 +88,7 @@ func (c *quicGoClient) Connect(addr string) error { } session, err := quicGo.DialAddr(addr, tlsConf, &quicGo.Config{ - MaxIdleTimeout: time.Second * 5, + MaxIdleTimeout: 500 * time.Millisecond, KeepAlive: true, MaxIncomingStreams: 1000000, MaxIncomingUniStreams: 1000000, diff --git a/pkg/quic/stream.go b/pkg/quic/stream.go index 7a295d7..e4f1967 100644 --- a/pkg/quic/stream.go +++ b/pkg/quic/stream.go @@ -1,20 +1,8 @@ package quic -import "io" +import "github.com/lucas-clemente/quic-go" // Stream is the QUIC stream type Stream interface { - ReceiveStream - SendStream -} - -// ReceiveStream is an unidirectional Receive Stream. -type ReceiveStream interface { - io.Reader -} - -// A SendStream is an unidirectional Send Stream. -type SendStream interface { - io.Writer - io.Closer + quic.Stream } diff --git a/pkg/rx/rxstream_operator.go b/pkg/rx/rxstream_operator.go index 4ed27d9..3b59c71 100644 --- a/pkg/rx/rxstream_operator.go +++ b/pkg/rx/rxstream_operator.go @@ -1,6 +1,7 @@ package rx import ( + "bytes" "context" "fmt" "io" @@ -9,8 +10,21 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/reactivex/rxgo/v2" y3 "github.com/yomorun/y3-codec-golang" + "github.com/yomorun/yomo/pkg/yy3" ) +type echo struct { + buf *bytes.Buffer +} + +func (e *echo) Write(p []byte) (n int, err error) { + return e.buf.Write(p) +} + +func (e *echo) Read(p []byte) (n int, err error) { + return e.buf.Read(p) +} + func FromChannel(channel chan []byte) RxStream { f := func(ctx context.Context, next chan rxgo.Item) { defer close(next) @@ -55,9 +69,42 @@ func FromReader(reader io.Reader) RxStream { return ConvertObservable(rxgo.FromChannel(next)) } -func FromReaderWithY3(reader io.Reader) RxStream { - source := y3.FromStream(reader) - return ConvertObservableWithY3(source) +func FromReaderWithY3(readers chan io.Reader) RxStream { + f := func(ctx context.Context, next chan rxgo.Item) { + defer close(next) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-readers: + if !ok { + return + } + r, w := io.Pipe() + if !Of(yy3.FromStream(r)).SendContext(ctx, next) { + return + } + + go func() { + time.Sleep(time.Millisecond) + defer w.Close() + for { + buf := make([]byte, 3*1024) + n, err := item.Read(buf) + + if err != nil { + break + } else { + value := buf[:n] + w.Write(value) + } + } + }() + } + } + } + return CreateObservable(f, rxgo.WithPublishStrategy()) } func FromReaderWithFunc(f func() io.Reader) RxStream { @@ -92,7 +139,6 @@ func Of(i interface{}) rxgo.Item { type RxStreamImpl struct { observable rxgo.Observable - y3 y3.Observable } func (s *RxStreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream { @@ -544,15 +590,37 @@ func (s *RxStreamImpl) MergeReadWriterWithFunc(rwf func() (io.ReadWriter, func() } func (s *RxStreamImpl) Subscribe(key byte) RxStream { - return ConvertObservableWithY3(s.y3.Subscribe(key)) + + f := func(ctx context.Context, next chan rxgo.Item) { + defer close(next) + observe := s.Observe() + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + if item.Error() { + return + } + + y3stream := (item.V).(yy3.Observable) + if !Of(y3stream.Subscribe(key)).SendContext(ctx, next) { + return + } + } + } + } + return CreateObservable(f) } func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) RxStream { f := func(ctx context.Context, next chan rxgo.Item) { defer close(next) - observe := s.y3.OnObserve(function) - + observe := s.Observe() for { select { case <-ctx.Done(): @@ -561,10 +629,26 @@ func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) R if !ok { return } - - if !Of(item).SendContext(ctx, next) { + if item.Error() { return } + go func() { + onObserve := (item.V).(yy3.Observable).OnObserve(function) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-onObserve: + if !ok { + return + } + if !Of(item).SendContext(ctx, next) { + return + } + } + } + }() } } } @@ -618,10 +702,6 @@ func ConvertObservable(observable rxgo.Observable) RxStream { return &RxStreamImpl{observable: observable} } -func ConvertObservableWithY3(observable y3.Observable) RxStream { - return &RxStreamImpl{y3: observable} -} - type infiniteWriter struct { io.Writer } diff --git a/pkg/yy3/yy3.go b/pkg/yy3/yy3.go new file mode 100644 index 0000000..1c9cb23 --- /dev/null +++ b/pkg/yy3/yy3.go @@ -0,0 +1,290 @@ +package yy3 + +import ( + "io" + "sync" + + "github.com/yomorun/y3-codec-golang/pkg/common" +) + +// Iterable iterate through and get the data of observe +type Iterable interface { + Observe() <-chan interface{} +} + +// Observable provide subscription and notification processing +type Observable interface { + Iterable + Subscribe(key byte) Observable + OnObserve(function func(v []byte) (interface{}, error)) chan interface{} +} + +type observableImpl struct { + iterable Iterable +} + +type iterableImpl struct { + next chan interface{} + subscribers []chan interface{} + mutex sync.RWMutex + producerAlreadyCreated bool +} + +func (i *iterableImpl) Observe() <-chan interface{} { + ch := make(chan interface{}) + i.mutex.Lock() + i.subscribers = append(i.subscribers, ch) + i.mutex.Unlock() + i.connect() + return ch +} + +func (i *iterableImpl) connect() { + i.mutex.Lock() + if !i.producerAlreadyCreated { + go i.produce() + i.producerAlreadyCreated = true + } + i.mutex.Unlock() +} + +func (i *iterableImpl) produce() { + defer func() { + i.mutex.RLock() + for _, subscriber := range i.subscribers { + close(subscriber) + } + i.mutex.RUnlock() + }() + + for { + select { + case item, ok := <-i.next: + if !ok { + return + } + i.mutex.RLock() + for _, subscriber := range i.subscribers { + subscriber <- item + } + i.mutex.RUnlock() + } + } +} + +func (o *observableImpl) Observe() <-chan interface{} { + return o.iterable.Observe() +} + +//FromStream reads data from reader +func FromStream(reader io.Reader) Observable { + + f := func(next chan interface{}) { + defer close(next) + for { + buf := make([]byte, 3*1024) + n, err := reader.Read(buf) + + if err != nil { + break + } else { + value := buf[:n] + next <- value + } + } + } + + return createObservable(f) +} + +//Processing callback function when there is data +func (o *observableImpl) OnObserve(function func(v []byte) (interface{}, error)) chan interface{} { + _next := make(chan interface{}) + + f := func(next chan interface{}) { + defer close(next) + + observe := o.Observe() + + for { + select { + case item, ok := <-observe: + if !ok { + return + } + buf := item.([]byte) + value, err := function(buf) + if err != nil { + return + } + + next <- value + } + } + } + + go f(_next) + + return _next +} + +//Get the value of the subscribe key from the stream +func (o *observableImpl) Subscribe(key byte) Observable { + + f := func(next chan interface{}) { + defer close(next) + + buffer := make([]byte, 0) + var ( + index int32 = 0 //vernier + state string = "RS" //RS,RLS,TS,LS,VS,REJECT + length int32 = 0 + value int32 = 0 + limit int32 = 0 + ) + + observe := o.Observe() + + for { + select { + case item, ok := <-observe: + if !ok { + return + } + buf := item.([]byte) + + for i := 0; i < len(buf); i++ { + b := buf[i] + switch state { + case "RS": + if common.IsRootTag(b) { + index++ + state = "RLS" + } else { + buffer = make([]byte, 0) + length = 0 + value = 0 + index = 0 + limit = 0 + } + continue + + case "RLS": + index++ + buffer = append(buffer, b) + l, err := common.DecodeLength(buffer) + + if err != nil { + continue + } + limit = index + l + state = "TS" + buffer = make([]byte, 0) + continue + case "TS": + index++ + buffer = make([]byte, 0) + buffer = append(buffer, b) + state = "LS" + continue + case "LS": + index++ + buffer = append(buffer, b) + l, err := common.DecodeLength(buffer[1:]) + if err != nil { + continue + } + + length = int32(len(buffer[1:])) + value = l + state = "VS" + continue + case "VS": + tail := int32(len(buf[i:])) + buflength := int32(len(buffer)) + + if tail >= ((1 + length + value) - buflength) { + start := i + end := int32(i) + (1 + length + value) - buflength + buffer = append(buffer, buf[start:end]...) + index += ((1 + length + value) - buflength) + i += (int((1+length+value)-buflength) - 1) + //check key + k := (buffer[0] << 2) >> 2 + if k == key { + next <- buffer + if limit == index { + state = "RS" + length = 0 + value = 0 + index = 0 + limit = 0 + buffer = make([]byte, 0) + } else { + state = "REJECT" + } + } else { + + if limit == index { + state = "RS" + length = 0 + value = 0 + index = 0 + limit = 0 + buffer = make([]byte, 0) + } else { + state = "TS" + length = 0 + value = 0 + } + + } + continue + } else { + buffer = append(buffer, buf[i:]...) + index += tail + break + } + case "REJECT": + tail := int32(len(buf[i:])) + if limit == index { + state = "RS" + length = 0 + value = 0 + index = 0 + limit = 0 + buffer = make([]byte, 0) + continue + } else if tail >= (limit - index) { + i += (int(limit-index) - 1) + state = "RS" + length = 0 + value = 0 + index = 0 + limit = 0 + buffer = make([]byte, 0) + continue + } else { + index += tail + break + } + } + } + + } + + } + + } + + return createObservable(f) + +} + +func createObservable(f func(next chan interface{})) Observable { + next := make(chan interface{}) + subscribers := make([]chan interface{}, 0) + + go f(next) + return &observableImpl{iterable: &iterableImpl{next: next, subscribers: subscribers}} +} -- GitLab