diff --git a/example/flow/app.go b/example/flow/app.go index 523ad844e87a378ba9e24f9e5d6dd1914fcbb784..a6ec851bde4cb1b39063a65ef940068fca6c875b 100644 --- a/example/flow/app.go +++ b/example/flow/app.go @@ -43,7 +43,8 @@ func Handler(rxstream rx.RxStream) rx.RxStream { OnObserve(callback). Debounce(rxgo.WithDuration(50 * time.Millisecond)). Map(printer). - StdOut() + StdOut(). + Encode(0x10) return stream } diff --git a/example/sink/app.go b/example/sink/app.go index 0f38c10a750fc4454a04ab02539e5ce572d22794..c07edfa13a90c8e69e23831b292f5ea83a751aca 100644 --- a/example/sink/app.go +++ b/example/sink/app.go @@ -25,6 +25,7 @@ func Handler(rxstream rx.RxStream) rx.RxStream { Subscribe(0x11). OnObserve(callback). AuditTime(100 * time.Millisecond). - Map(store) + Map(store). + Encode(0x10) return stream } diff --git a/internal/cmd/run.go b/internal/cmd/run.go index a0293f14612f37d1327e1f7306163749b4422510..50e9502604f5e3a0f1ee31abd3d125380ba26a7d 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -7,7 +7,6 @@ import ( "plugin" "github.com/spf13/cobra" - y3 "github.com/yomorun/y3-codec-golang" "github.com/yomorun/yomo/internal/dispatcher" "github.com/yomorun/yomo/internal/serverless" "github.com/yomorun/yomo/pkg/quic" @@ -19,8 +18,6 @@ type RunOptions struct { baseOptions // Port is the port number of UDP host for Serverless function (default is 4242). Port int - - mockSink bool } // NewCmdRun creates a new command run. @@ -47,7 +44,6 @@ func NewCmdRun() *cobra.Command { endpoint := fmt.Sprintf("0.0.0.0:%d", opts.Port) quicHandler := &quicServerHandler{ serverlessHandle: slHandler, - mockSink: opts.mockSink, } err = serverless.Run(endpoint, quicHandler) @@ -59,14 +55,12 @@ func NewCmdRun() *cobra.Command { cmd.Flags().StringVarP(&opts.Filename, "file-name", "f", "app.go", "Serverless function file (default is app.go)") cmd.Flags().IntVarP(&opts.Port, "port", "p", 4242, "Port is the port number of UDP host for Serverless function (default is 4242)") - cmd.Flags().BoolVar(&opts.mockSink, "mock-sink", false, "Indicates whether the Serverless is a mock sink") return cmd } type quicServerHandler struct { serverlessHandle plugin.Symbol - mockSink bool } func (s quicServerHandler) Listen() error { @@ -76,21 +70,12 @@ func (s quicServerHandler) Listen() error { func (s quicServerHandler) Read(st quic.Stream) error { stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st)) - y3codec := y3.NewCodec(0x10) - go func() { for customer := range stream.Observe() { if customer.Error() { fmt.Println(customer.E.Error()) } else if customer.V != nil { - // HACK - if s.mockSink { - st.Write([]byte("Finish sink!")) - } else { - // use Y3 codec to encode the data - sendingBuf, _ := y3codec.Marshal(customer.V) - st.Write(sendingBuf) - } + st.Write((customer.V).([]byte)) } } }() diff --git a/pkg/rx/rxstream.go b/pkg/rx/rxstream.go index 36838161da6305cc1c05a7496bdab47c3fcd4eb7..d672010c5ef96abb6460109eb5f35c78e0aec861 100644 --- a/pkg/rx/rxstream.go +++ b/pkg/rx/rxstream.go @@ -13,6 +13,7 @@ type RxStream interface { rxgo.Iterable MergeReadWriterWithFunc(rwf func() (io.ReadWriter, func()), opts ...rxgo.Option) RxStream Subscribe(key byte) RxStream + Encode(key byte, opts ...rxgo.Option) RxStream OnObserve(function func(v []byte) (interface{}, error)) RxStream StdOut(opts ...rxgo.Option) RxStream AuditTime(timespan time.Duration, opts ...rxgo.Option) RxStream diff --git a/pkg/rx/rxstream_operator.go b/pkg/rx/rxstream_operator.go index 1c0e2869b86176f5c6fe477b10997bb88c9cef54..4ed27d9dbe426a9ff9bf55896eaa74491dd1349c 100644 --- a/pkg/rx/rxstream_operator.go +++ b/pkg/rx/rxstream_operator.go @@ -571,6 +571,42 @@ func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) R return CreateObservable(f) } +func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream { + y3codec := y3.NewCodec(key) + + f := func(ctx context.Context, next chan rxgo.Item) { + defer close(next) + observe := s.Observe(opts...) + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + + if item.Error() { + return + } + + buf, err := y3codec.Marshal(item.V) + + if err != nil { + return + } + + if !Of(buf).SendContext(ctx, next) { + return + } + + } + } + } + return CreateObservable(f, opts...) +} + func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream { next := make(chan rxgo.Item) ctx := context.Background()