未验证 提交 59e55409 编写于 作者: J JJW 提交者: GitHub

feat(rx): add y3Encode (#127)

Co-authored-by: Njjwygjj <wangyaoguang@cel.la>
上级 bb7834f9
...@@ -43,7 +43,8 @@ func Handler(rxstream rx.RxStream) rx.RxStream { ...@@ -43,7 +43,8 @@ func Handler(rxstream rx.RxStream) rx.RxStream {
OnObserve(callback). OnObserve(callback).
Debounce(rxgo.WithDuration(50 * time.Millisecond)). Debounce(rxgo.WithDuration(50 * time.Millisecond)).
Map(printer). Map(printer).
StdOut() StdOut().
Encode(0x10)
return stream return stream
} }
...@@ -25,6 +25,7 @@ func Handler(rxstream rx.RxStream) rx.RxStream { ...@@ -25,6 +25,7 @@ func Handler(rxstream rx.RxStream) rx.RxStream {
Subscribe(0x11). Subscribe(0x11).
OnObserve(callback). OnObserve(callback).
AuditTime(100 * time.Millisecond). AuditTime(100 * time.Millisecond).
Map(store) Map(store).
Encode(0x10)
return stream return stream
} }
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"plugin" "plugin"
"github.com/spf13/cobra" "github.com/spf13/cobra"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/internal/dispatcher" "github.com/yomorun/yomo/internal/dispatcher"
"github.com/yomorun/yomo/internal/serverless" "github.com/yomorun/yomo/internal/serverless"
"github.com/yomorun/yomo/pkg/quic" "github.com/yomorun/yomo/pkg/quic"
...@@ -19,8 +18,6 @@ type RunOptions struct { ...@@ -19,8 +18,6 @@ type RunOptions struct {
baseOptions baseOptions
// Port is the port number of UDP host for Serverless function (default is 4242). // Port is the port number of UDP host for Serverless function (default is 4242).
Port int Port int
mockSink bool
} }
// NewCmdRun creates a new command run. // NewCmdRun creates a new command run.
...@@ -47,7 +44,6 @@ func NewCmdRun() *cobra.Command { ...@@ -47,7 +44,6 @@ func NewCmdRun() *cobra.Command {
endpoint := fmt.Sprintf("0.0.0.0:%d", opts.Port) endpoint := fmt.Sprintf("0.0.0.0:%d", opts.Port)
quicHandler := &quicServerHandler{ quicHandler := &quicServerHandler{
serverlessHandle: slHandler, serverlessHandle: slHandler,
mockSink: opts.mockSink,
} }
err = serverless.Run(endpoint, quicHandler) err = serverless.Run(endpoint, quicHandler)
...@@ -59,14 +55,12 @@ func NewCmdRun() *cobra.Command { ...@@ -59,14 +55,12 @@ func NewCmdRun() *cobra.Command {
cmd.Flags().StringVarP(&opts.Filename, "file-name", "f", "app.go", "Serverless function file (default is app.go)") cmd.Flags().StringVarP(&opts.Filename, "file-name", "f", "app.go", "Serverless function file (default is app.go)")
cmd.Flags().IntVarP(&opts.Port, "port", "p", 4242, "Port is the port number of UDP host for Serverless function (default is 4242)") cmd.Flags().IntVarP(&opts.Port, "port", "p", 4242, "Port is the port number of UDP host for Serverless function (default is 4242)")
cmd.Flags().BoolVar(&opts.mockSink, "mock-sink", false, "Indicates whether the Serverless is a mock sink")
return cmd return cmd
} }
type quicServerHandler struct { type quicServerHandler struct {
serverlessHandle plugin.Symbol serverlessHandle plugin.Symbol
mockSink bool
} }
func (s quicServerHandler) Listen() error { func (s quicServerHandler) Listen() error {
...@@ -76,21 +70,12 @@ func (s quicServerHandler) Listen() error { ...@@ -76,21 +70,12 @@ func (s quicServerHandler) Listen() error {
func (s quicServerHandler) Read(st quic.Stream) error { func (s quicServerHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st)) stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))
y3codec := y3.NewCodec(0x10)
go func() { go func() {
for customer := range stream.Observe() { for customer := range stream.Observe() {
if customer.Error() { if customer.Error() {
fmt.Println(customer.E.Error()) fmt.Println(customer.E.Error())
} else if customer.V != nil { } else if customer.V != nil {
// HACK st.Write((customer.V).([]byte))
if s.mockSink {
st.Write([]byte("Finish sink!"))
} else {
// use Y3 codec to encode the data
sendingBuf, _ := y3codec.Marshal(customer.V)
st.Write(sendingBuf)
}
} }
} }
}() }()
......
...@@ -13,6 +13,7 @@ type RxStream interface { ...@@ -13,6 +13,7 @@ type RxStream interface {
rxgo.Iterable rxgo.Iterable
MergeReadWriterWithFunc(rwf func() (io.ReadWriter, func()), opts ...rxgo.Option) RxStream MergeReadWriterWithFunc(rwf func() (io.ReadWriter, func()), opts ...rxgo.Option) RxStream
Subscribe(key byte) RxStream Subscribe(key byte) RxStream
Encode(key byte, opts ...rxgo.Option) RxStream
OnObserve(function func(v []byte) (interface{}, error)) RxStream OnObserve(function func(v []byte) (interface{}, error)) RxStream
StdOut(opts ...rxgo.Option) RxStream StdOut(opts ...rxgo.Option) RxStream
AuditTime(timespan time.Duration, opts ...rxgo.Option) RxStream AuditTime(timespan time.Duration, opts ...rxgo.Option) RxStream
......
...@@ -571,6 +571,42 @@ func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) R ...@@ -571,6 +571,42 @@ func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) R
return CreateObservable(f) return CreateObservable(f)
} }
func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream {
y3codec := y3.NewCodec(key)
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
observe := s.Observe(opts...)
for {
select {
case <-ctx.Done():
return
case item, ok := <-observe:
if !ok {
return
}
if item.Error() {
return
}
buf, err := y3codec.Marshal(item.V)
if err != nil {
return
}
if !Of(buf).SendContext(ctx, next) {
return
}
}
}
}
return CreateObservable(f, opts...)
}
func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream { func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream {
next := make(chan rxgo.Item) next := make(chan rxgo.Item)
ctx := context.Background() ctx := context.Background()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册